From 8e90b4447453d545bff92b3151d7209332709d5c Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Tue, 10 Jan 2023 18:20:23 +0800 Subject: [PATCH] mysql (ticdc): back port batch dml to release 6.1 (#7857) ref pingcap/tiflow#7653 --- cdc/entry/mounter_test.go | 181 ++++++ cdc/model/sink.go | 100 ++++ cdc/sink/mysql/mysql.go | 373 ++++++++++-- cdc/sink/mysql/mysql_params.go | 12 + cdc/sink/mysql/mysql_params_test.go | 2 + cdc/sink/mysql/mysql_test.go | 564 +++++++++++++++++- cdc/sink/mysql/mysql_worker.go | 19 +- cdc/sink/mysql/mysql_worker_test.go | 14 +- pkg/applier/redo_test.go | 12 +- pkg/sqlmodel/multivalue.go | 146 +++++ .../_utils/run_kafka_consumer | 5 +- .../batch_dml/conf/diff_config.toml | 29 + .../integration_tests/batch_dml/data/test.sql | 177 ++++++ .../batch_dml/data/test_finish.sql | 7 + .../batch_dml/data/test_v5.sql | 27 + tests/integration_tests/batch_dml/run.sh | 74 +++ 16 files changed, 1628 insertions(+), 114 deletions(-) create mode 100644 tests/integration_tests/batch_dml/conf/diff_config.toml create mode 100644 tests/integration_tests/batch_dml/data/test.sql create mode 100644 tests/integration_tests/batch_dml/data/test_finish.sql create mode 100644 tests/integration_tests/batch_dml/data/test_v5.sql create mode 100644 tests/integration_tests/batch_dml/run.sh diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 8385aad0f50..ac5eefeaefd 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -14,11 +14,20 @@ package entry import ( + "bytes" "context" "strings" "testing" "time" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tiflow/pkg/sqlmodel" + "github.com/pingcap/log" ticonfig "github.com/pingcap/tidb/config" tidbkv "github.com/pingcap/tidb/kv" @@ -961,3 +970,175 @@ func TestGetDefaultZeroValue(t *testing.T) { require.Equal(t, tc.Default, val, tc.Name) } } + +func TestBuildTableInfo(t *testing.T) { + cases := []struct { + origin string + recovered string + recoveredWithNilCol string + }{ + { + "CREATE TABLE t1 (c INT PRIMARY KEY)", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) NOT NULL,\n" + + " PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) NOT NULL,\n" + + " PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + { + "CREATE TABLE t1 (" + + " c INT UNSIGNED," + + " c2 VARCHAR(10) NOT NULL," + + " c3 BIT(10) NOT NULL," + + " UNIQUE KEY (c2, c3)" + + ")", + // CDC discards field length. + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) unsigned DEFAULT NULL,\n" + + " `c2` varchar(0) NOT NULL,\n" + + " `c3` bit(0) NOT NULL,\n" + + " UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " `c2` varchar(0) NOT NULL,\n" + + " `c3` bit(0) NOT NULL,\n" + + " UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + { + "CREATE TABLE t1 (" + + " c INT UNSIGNED," + + " gen INT AS (c+1) VIRTUAL," + + " c2 VARCHAR(10) NOT NULL," + + " gen2 INT AS (c+2) STORED," + + " c3 BIT(10) NOT NULL," + + " PRIMARY KEY (c, c2)" + + ")", + // CDC discards virtual generated column, and generating expression of stored generated column. + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) unsigned NOT NULL,\n" + + " `c2` varchar(0) NOT NULL,\n" + + " `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" + + " `c3` bit(0) NOT NULL,\n" + + " PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `c` int(0) unsigned NOT NULL,\n" + + " `c2` varchar(0) NOT NULL,\n" + + " `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + { + "CREATE TABLE `t1` (" + + " `a` int(11) NOT NULL," + + " `b` int(11) DEFAULT NULL," + + " `c` int(11) DEFAULT NULL," + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */," + + " UNIQUE KEY `b` (`b`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `a` int(0) NOT NULL,\n" + + " `b` int(0) DEFAULT NULL,\n" + + " `c` int(0) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */,\n" + + " UNIQUE KEY `idx_1` (`b`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `a` int(0) NOT NULL,\n" + + " `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" + + " PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + } + p := parser.New() + for _, c := range cases { + stmt, err := p.ParseOneStmt(c.origin, "", "") + require.NoError(t, err) + originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) + require.NoError(t, err) + cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) + cols, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) + require.NoError(t, err) + recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) + handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) + require.NotNil(t, handle.UniqueNotNullIdx) + require.Equal(t, c.recovered, showCreateTable(t, recoveredTI)) + + // mimic the columns are set to nil when old value feature is disabled + for i := range cols { + if !cols[i].Flag.IsHandleKey() { + cols[i] = nil + } + } + recoveredTI = model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) + handle = sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) + require.NotNil(t, handle.UniqueNotNullIdx) + require.Equal(t, c.recoveredWithNilCol, showCreateTable(t, recoveredTI)) + } +} + +var tiCtx = mock.NewContext() + +func showCreateTable(t *testing.T, ti *timodel.TableInfo) string { + result := bytes.NewBuffer(make([]byte, 0, 512)) + err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result) + require.NoError(t, err) + return result.String() +} + +func TestNewDMRowChange(t *testing.T) { + cases := []struct { + origin string + recovered string + }{ + { + "CREATE TABLE t1 (id INT," + + " a1 INT NOT NULL," + + " a3 INT NOT NULL," + + " UNIQUE KEY dex1(a1, a3));", + "CREATE TABLE `BuildTiDBTableInfo` (\n" + + " `id` int(0) DEFAULT NULL,\n" + + " `a1` int(0) NOT NULL,\n" + + " `a3` int(0) NOT NULL,\n" + + " UNIQUE KEY `idx_0` (`a1`(0),`a3`(0))\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + }, + } + p := parser.New() + for _, c := range cases { + stmt, err := p.ParseOneStmt(c.origin, "", "") + require.NoError(t, err) + originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) + require.NoError(t, err) + cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) + cols := []*model.Column{ + { + Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil, + }, + { + Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil, + }, + { + Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil, + }, + } + recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) + require.Equal(t, c.recovered, showCreateTable(t, recoveredTI)) + tableName := &model.TableName{Schema: "db", Table: "t1"} + rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, recoveredTI, nil, nil) + sqlGot, argsGot := rowChange.GenSQL(sqlmodel.DMLDelete) + require.Equal(t, "DELETE FROM `db`.`t1` WHERE `a1` = ? AND `a3` = ? LIMIT 1", sqlGot) + require.Equal(t, []interface{}{1, 2}, argsGot) + + sqlGot, argsGot = sqlmodel.GenDeleteSQL(rowChange, rowChange) + require.Equal(t, "DELETE FROM `db`.`t1` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", sqlGot) + require.Equal(t, []interface{}{1, 2, 1, 2}, argsGot) + } +} diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 6d861d21838..2e18678ee16 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -19,6 +19,8 @@ import ( "sync" "unsafe" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/log" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/rowcodec" @@ -438,6 +440,104 @@ type RedoColumn struct { Flag uint64 `msg:"flag"` } +// BuildTiDBTableInfo builds a TiDB TableInfo from given information. +func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo { + ret := &model.TableInfo{} + // nowhere will use this field, so we set a debug message + ret.Name = model.NewCIStr("BuildTiDBTableInfo") + + for i, col := range columns { + columnInfo := &model.ColumnInfo{ + Offset: i, + State: model.StatePublic, + } + if col == nil { + // by referring to datum2Column, nil is happened when + // - !IsColCDCVisible, which means the column is a virtual generated + // column + // - !exist && !fillWithDefaultValue, which means upstream does not + // send the column value + // just mock for the first case + columnInfo.Name = model.NewCIStr("omitted") + columnInfo.GeneratedExprString = "pass_generated_check" + columnInfo.GeneratedStored = false + ret.Columns = append(ret.Columns, columnInfo) + continue + } + columnInfo.Name = model.NewCIStr(col.Name) + columnInfo.SetType(col.Type) + // TiKV always use utf8mb4 to store, and collation is not recorded by CDC + columnInfo.SetCharset(mysql.UTF8MB4Charset) + columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation) + + // inverse initColumnsFlag + flag := col.Flag + if flag.IsBinary() { + columnInfo.SetCharset("binary") + } + if flag.IsGeneratedColumn() { + // we do not use this field, so we set it to any non-empty string + columnInfo.GeneratedExprString = "pass_generated_check" + columnInfo.GeneratedStored = true + } + if flag.IsHandleKey() { + columnInfo.AddFlag(mysql.PriKeyFlag) + ret.IsCommonHandle = true + } else if flag.IsPrimaryKey() { + columnInfo.AddFlag(mysql.PriKeyFlag) + } + if flag.IsUniqueKey() { + columnInfo.AddFlag(mysql.UniqueKeyFlag) + } + if !flag.IsNullable() { + columnInfo.AddFlag(mysql.NotNullFlag) + } + if flag.IsMultipleKey() { + columnInfo.AddFlag(mysql.MultipleKeyFlag) + } + if flag.IsUnsigned() { + columnInfo.AddFlag(mysql.UnsignedFlag) + } + ret.Columns = append(ret.Columns, columnInfo) + } + + for i, colOffsets := range indexColumns { + indexInfo := &model.IndexInfo{ + Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)), + State: model.StatePublic, + } + firstCol := columns[colOffsets[0]] + if firstCol == nil { + // when the referenced column is nil, we already have a handle index, + // so we can skip this index. + // only happens for DELETE event and old value feature is disabled + continue + } + if firstCol.Flag.IsPrimaryKey() { + indexInfo.Primary = true + indexInfo.Unique = true + } + if firstCol.Flag.IsUniqueKey() { + indexInfo.Unique = true + } + + for _, offset := range colOffsets { + col := ret.Columns[offset] + + indexCol := &model.IndexColumn{} + indexCol.Name = col.Name + indexCol.Offset = offset + indexInfo.Columns = append(indexInfo.Columns, indexCol) + } + + // TODO: revert the "all column set index related flag" to "only the + // first column set index related flag" if needed + + ret.Indices = append(ret.Indices, indexInfo) + } + return ret +} + // ColumnValueString returns the string representation of the column value func ColumnValueString(c interface{}) string { var data string diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index f142b4b51a6..d7229081612 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "github.com/pingcap/tiflow/pkg/sqlmodel" + dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -741,6 +743,210 @@ func (s *mysqlSink) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDM retry.WithIsRetryableErr(isRetryableDMLError)) } +// convert2RowChanges is a helper function that convert the row change representation +// of CDC into a general one. +func convert2RowChanges( + row *model.RowChangedEvent, + tableInfo *timodel.TableInfo, + changeType sqlmodel.RowChangeType, +) *sqlmodel.RowChange { + preValues := make([]interface{}, 0, len(row.PreColumns)) + for _, col := range row.PreColumns { + if col == nil { + // will not use this value, just append a dummy value + preValues = append(preValues, "omitted value") + continue + } + preValues = append(preValues, col.Value) + } + + postValues := make([]interface{}, 0, len(row.Columns)) + for _, col := range row.Columns { + if col == nil { + postValues = append(postValues, "omitted value") + continue + } + postValues = append(postValues, col.Value) + } + + var res *sqlmodel.RowChange + + switch changeType { + case sqlmodel.RowChangeInsert: + res = sqlmodel.NewRowChange( + row.Table, + nil, + nil, + postValues, + tableInfo, + nil, nil) + case sqlmodel.RowChangeUpdate: + res = sqlmodel.NewRowChange( + row.Table, + nil, + preValues, + postValues, + tableInfo, + nil, nil) + case sqlmodel.RowChangeDelete: + res = sqlmodel.NewRowChange( + row.Table, + nil, + preValues, + nil, + tableInfo, + nil, nil) + } + return res +} + +func convertBinaryToString(row *model.RowChangedEvent) { + for i, col := range row.Columns { + if col == nil { + continue + } + if col.Charset != "" && col.Charset != charset.CharsetBin { + colValBytes, ok := col.Value.([]byte) + if ok { + row.Columns[i].Value = string(colValBytes) + } + } + } +} + +// TODO: Find a way to make batch delete dmls more efficient. +func (s *mysqlSink) groupRowsByType( + singleTxnDMLs []*model.RowChangedEvent, + tableInfo *timodel.TableInfo, + spiltUpdate bool, +) (insertRows, updateRows, deleteRows [][]*sqlmodel.RowChange) { + preAllocateSize := len(singleTxnDMLs) + if preAllocateSize > s.params.maxTxnRow { + preAllocateSize = s.params.maxTxnRow + } + + insertRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) + updateRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) + deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) + + for _, row := range singleTxnDMLs { + convertBinaryToString(row) + + if row.IsInsert() { + insertRow = append( + insertRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) + if len(insertRow) >= s.params.maxTxnRow { + insertRows = append(insertRows, insertRow) + insertRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } + } + + if row.IsDelete() { + deleteRow = append( + deleteRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) + if len(deleteRow) >= s.params.maxTxnRow { + deleteRows = append(deleteRows, deleteRow) + deleteRow = make([]*sqlmodel.RowChange, 0, preAllocateSize) + } + } + + if row.IsUpdate() { + if spiltUpdate { + deleteRow = append( + deleteRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeDelete)) + if len(deleteRow) >= s.params.maxTxnRow { + deleteRows = append(deleteRows, deleteRow) + deleteRow = make([]*sqlmodel.RowChange, 0, s.params.maxTxnRow) + } + insertRow = append( + insertRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeInsert)) + if len(insertRow) >= s.params.maxTxnRow { + insertRows = append(insertRows, insertRow) + insertRow = make([]*sqlmodel.RowChange, 0, s.params.maxTxnRow) + } + } else { + updateRow = append( + updateRow, + convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate)) + if len(updateRow) >= s.params.maxTxnRow { + updateRows = append(updateRows, updateRow) + updateRow = make([]*sqlmodel.RowChange, 0, s.params.maxTxnRow) + } + } + } + } + + if len(insertRow) > 0 { + insertRows = append(insertRows, insertRow) + } + if len(updateRow) > 0 { + updateRows = append(updateRows, updateRow) + } + if len(deleteRow) > 0 { + deleteRows = append(deleteRows, deleteRow) + } + + return +} + +func (s *mysqlSink) batchSingleTxnDmls( + singleTxnDMLs []*model.RowChangedEvent, + tableInfo *timodel.TableInfo, + translateToInsert bool, +) (sqls []string, values [][]interface{}) { + insertRows, updateRows, deleteRows := s.groupRowsByType(singleTxnDMLs, tableInfo, !translateToInsert) + + if len(deleteRows) > 0 { + for _, rows := range deleteRows { + sql, value := sqlmodel.GenDeleteSQL(rows...) + sqls = append(sqls, sql) + values = append(values, value) + } + } + + // handle insert + if len(insertRows) > 0 { + for _, rows := range insertRows { + if translateToInsert { + sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLInsert, rows...) + sqls = append(sqls, sql) + values = append(values, value) + } else { + sql, value := sqlmodel.GenInsertSQL(sqlmodel.DMLReplace, rows...) + sqls = append(sqls, sql) + values = append(values, value) + } + } + } + + // handle update + if len(updateRows) > 0 { + for _, rows := range updateRows { + sql, value := sqlmodel.GenUpdateSQL(rows...) + sqls = append(sqls, sql) + values = append(values, value) + } + } + + return +} + +func hasHandleKey(cols []*model.Column) bool { + for _, col := range cols { + if col == nil { + continue + } + if col.Flag.IsHandleKey() { + return true + } + } + return false +} + type preparedDMLs struct { sqls []string values [][]interface{} @@ -749,22 +955,17 @@ type preparedDMLs struct { } // prepareDMLs converts model.RowChangedEvent list to query string list and args list -func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, bucket int) *preparedDMLs { - sqls := make([]string, 0, len(rows)) - values := make([][]interface{}, 0, len(rows)) +func (s *mysqlSink) prepareDMLs(txns []*model.SingleTableTxn, replicaID uint64, bucket int) *preparedDMLs { + inRowCount := 0 + for _, txn := range txns { + inRowCount += len(txn.Rows) + } + + sqls := make([]string, 0, inRowCount) + values := make([][]interface{}, 0, inRowCount) replaces := make(map[string][][]interface{}) + rowCount := 0 - // translateToInsert control the update and insert behavior - translateToInsert := s.params.enableOldValue && !s.params.safeMode - for _, row := range rows { - if !translateToInsert { - break - } - // It can be translated in to INSERT, if the row is committed after - // we starting replicating the table, which means it must not be - // replicated before, and there is no such row in downstream MySQL. - translateToInsert = row.CommitTs > row.ReplicatingTs - } // flush cached batch replace or insert, to keep the sequence of DMLs flushCacheDMLs := func() { @@ -776,77 +977,117 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, } } - for _, row := range rows { - var query string - var args []interface{} - quoteTable := quotes.QuoteSchema(row.Table.Schema, row.Table.Table) + // translateToInsert control the update and insert behavior + translateToInsert := s.params.enableOldValue && !s.params.safeMode + for _, txn := range txns { + if len(txn.Rows) == 0 { + continue + } - // If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE. - // NOTICE: Only update events with the old value feature enabled will have both columns and preColumns. - if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 { - flushCacheDMLs() - query, args = prepareUpdate(quoteTable, row.PreColumns, row.Columns, s.forceReplicate) - if query != "" { - sqls = append(sqls, query) - values = append(values, args) - rowCount++ + firstRow := txn.Rows[0] + + // A row can be translated in to INSERT, when it was committed after + // the table it belongs to been replicating by TiCDC, which means it must not be + // replicated before, and there is no such row in downstream MySQL. + for _, row := range txn.Rows { + if !translateToInsert { + break } - continue + // It can be translated in to INSERT, if the row is committed after + // we starting replicating the table, which means it must not be + // replicated before, and there is no such row in downstream MySQL. + translateToInsert = row.CommitTs > row.ReplicatingTs } - // Case for update event or delete event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a DELETE SQL here. - // For delete event: - // It will be translated directly into a DELETE SQL. - if len(row.PreColumns) != 0 { - flushCacheDMLs() - query, args = prepareDelete(quoteTable, row.PreColumns, s.forceReplicate) - if query != "" { - sqls = append(sqls, query) - values = append(values, args) - rowCount++ + // Determine whether to use batch dml feature here. + if s.params.batchDMLEnable { + tableColumns := firstRow.Columns + if firstRow.IsDelete() { + tableColumns = firstRow.PreColumns + } + // only use batch dml when the table has a handle key + if hasHandleKey(tableColumns) { + rowCount += len(txn.Rows) + // TODO(dongmen): find a better way to get table info. + tableInfo := model.BuildTiDBTableInfo(tableColumns, firstRow.IndexColumns) + sql, value := s.batchSingleTxnDmls(txn.Rows, tableInfo, translateToInsert) + sqls = append(sqls, sql...) + values = append(values, value...) + continue } } - // Case for update event or insert event. - // For update event: - // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. - // So we will prepare a REPLACE SQL here. - // For insert event: - // It will be translated directly into a - // INSERT(old value is enabled and not in safe mode) - // or REPLACE(old value is disabled or in safe mode) SQL. - if len(row.Columns) != 0 { - if s.params.batchReplaceEnabled { - query, args = prepareReplace(quoteTable, row.Columns, false /* appendPlaceHolder */, translateToInsert) + for _, row := range txn.Rows { + quoteTable := row.Table.QuoteString() + var query string + var args []interface{} + // If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE. + // NOTICE: Only update events with the old value feature enabled will have both columns and preColumns. + if translateToInsert && len(row.PreColumns) != 0 && len(row.Columns) != 0 { + flushCacheDMLs() + query, args = prepareUpdate(quoteTable, row.PreColumns, row.Columns, s.forceReplicate) if query != "" { - if _, ok := replaces[query]; !ok { - replaces[query] = make([][]interface{}, 0) - } - replaces[query] = append(replaces[query], args) + sqls = append(sqls, query) + values = append(values, args) rowCount++ } - } else { - query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert) + continue + } + + // Case for update event or delete event. + // For update event: + // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. + // So we will prepare a DELETE SQL here. + // For delete event: + // It will be translated directly into a DELETE SQL. + if len(row.PreColumns) != 0 { + flushCacheDMLs() + query, args = prepareDelete(quoteTable, row.PreColumns, s.forceReplicate) if query != "" { sqls = append(sqls, query) values = append(values, args) rowCount++ } } + + // Case for update event or insert event. + // For update event: + // If old value is disabled or in safe mode, update will be translated to DELETE + REPLACE SQL. + // So we will prepare a REPLACE SQL here. + // For insert event: + // It will be translated directly into a + // INSERT(old value is enabled and not in safe mode) + // or REPLACE(old value is disabled or in safe mode) SQL. + if len(row.Columns) != 0 { + if s.params.batchReplaceEnabled { + query, args = prepareReplace(quoteTable, row.Columns, false /* appendPlaceHolder */, translateToInsert) + if query != "" { + if _, ok := replaces[query]; !ok { + replaces[query] = make([][]interface{}, 0) + } + replaces[query] = append(replaces[query], args) + rowCount++ + } + } else { + query, args = prepareReplace(quoteTable, row.Columns, true /* appendPlaceHolder */, translateToInsert) + if query != "" { + sqls = append(sqls, query) + values = append(values, args) + rowCount++ + } + } + } } } flushCacheDMLs() - dmls := &preparedDMLs{ sqls: sqls, values: values, } - if s.cyclic != nil && len(rows) > 0 { + + if s.cyclic != nil && inRowCount > 0 { // Write mark table with the current replica ID. - row := rows[0] + row := txns[0].Rows[0] updateMark := s.cyclic.UdpateSourceTableCyclicMark( row.Table.Schema, row.Table.Table, uint64(bucket), replicaID, row.StartTs) dmls.markSQL = updateMark @@ -857,7 +1098,7 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, return dmls } -func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, replicaID uint64, bucket int) error { +func (s *mysqlSink) execDMLs(ctx context.Context, txns []*model.SingleTableTxn, replicaID uint64, bucket int) error { failpoint.Inject("SinkFlushDMLPanic", func() { time.Sleep(time.Second) log.Fatal("SinkFlushDMLPanic") @@ -868,9 +1109,11 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, time.Sleep(time.Second * 2) failpoint.Return(errors.Trace(dmysql.ErrInvalidConn)) }) - s.statistics.ObserveRows(rows...) - dmls := s.prepareDMLs(rows, replicaID, bucket) - log.Debug("prepare DMLs", zap.Any("rows", rows), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) + for _, txn := range txns { + s.statistics.ObserveRows(txn.Rows...) + } + dmls := s.prepareDMLs(txns, replicaID, bucket) + log.Debug("prepare DMLs", zap.Any("txns", txns), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values)) if err := s.execDMLWithMaxRetries(ctx, dmls, bucket); err != nil { log.Error("execute DMLs failed", zap.String("err", err.Error())) return errors.Trace(err) diff --git a/cdc/sink/mysql/mysql_params.go b/cdc/sink/mysql/mysql_params.go index b2ae915ca1b..9f2eb3b46d7 100644 --- a/cdc/sink/mysql/mysql_params.go +++ b/cdc/sink/mysql/mysql_params.go @@ -53,6 +53,7 @@ const ( defaultSafeMode = false defaultTxnIsolationRC = "READ-COMMITTED" defaultCharacterSet = "utf8mb4" + defaultBatchDMLEnable = true ) var ( @@ -70,6 +71,7 @@ var defaultParams = &sinkParams{ writeTimeout: defaultWriteTimeout, dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, + batchDMLEnable: defaultBatchDMLEnable, } var validSchemes = map[string]bool{ @@ -94,6 +96,7 @@ type sinkParams struct { safeMode bool timezone string tls string + batchDMLEnable bool } func (s *sinkParams) Clone() *sinkParams { @@ -254,6 +257,15 @@ func parseSinkURIToParams(ctx context.Context, params.dialTimeout = s } + s = sinkURI.Query().Get("batch-dml-enable") + if s != "" { + enable, err := strconv.ParseBool(s) + if err != nil { + return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + params.batchDMLEnable = enable + } + return params, nil } diff --git a/cdc/sink/mysql/mysql_params_test.go b/cdc/sink/mysql/mysql_params_test.go index ecf38c00e87..b92ac5edc81 100644 --- a/cdc/sink/mysql/mysql_params_test.go +++ b/cdc/sink/mysql/mysql_params_test.go @@ -44,6 +44,7 @@ func TestSinkParamsClone(t *testing.T) { writeTimeout: defaultWriteTimeout, dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, + batchDMLEnable: defaultBatchDMLEnable, }, param1) require.Equal(t, &sinkParams{ changefeedID: model.DefaultChangeFeedID("123"), @@ -56,6 +57,7 @@ func TestSinkParamsClone(t *testing.T) { writeTimeout: defaultWriteTimeout, dialTimeout: defaultDialTimeout, safeMode: defaultSafeMode, + batchDMLEnable: defaultBatchDMLEnable, }, param2) } diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 9a803a73c9e..912d0132405 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -50,6 +50,7 @@ func newMySQLSink4Test(ctx context.Context, t *testing.T) *mysqlSink { require.Nil(t, err) params := defaultParams.Clone() params.batchReplaceEnabled = false + params.batchDMLEnable = false return &mysqlSink{ txnCache: newUnresolvedTxnCache(), filter: f, @@ -123,7 +124,12 @@ func TestPrepareDML(t *testing.T) { defer cancel() ms := newMySQLSink4Test(ctx, t) for _, tc := range testCases { - dmls := ms.prepareDMLs(tc.input, 0, 0) + txns := make([]*model.SingleTableTxn, 0) + txn := &model.SingleTableTxn{ + Rows: tc.input, + } + txns = append(txns, txn) + dmls := ms.prepareDMLs(txns, 0, 0) require.Equal(t, tc.expected, dmls) } } @@ -844,7 +850,7 @@ func TestReduceReplace(t *testing.T) { }{ { replaces: map[string][][]interface{}{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES ": { + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ": { []interface{}{1, "1"}, []interface{}{2, "2"}, []interface{}{3, "3"}, @@ -853,9 +859,9 @@ func TestReduceReplace(t *testing.T) { batchSize: 1, sort: false, expectSQLs: []string{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?)", - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?)", - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)", }, expectArgs: [][]interface{}{ {1, "1"}, @@ -865,7 +871,7 @@ func TestReduceReplace(t *testing.T) { }, { replaces: map[string][][]interface{}{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES ": { + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ": { []interface{}{1, "1"}, []interface{}{2, "2"}, []interface{}{3, "3"}, @@ -876,8 +882,8 @@ func TestReduceReplace(t *testing.T) { batchSize: 3, sort: false, expectSQLs: []string{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?),(?,?),(?,?)", - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?),(?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?),(?,?),(?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?),(?,?)", }, expectArgs: [][]interface{}{ {1, "1", 2, "2", 3, "3"}, @@ -886,7 +892,7 @@ func TestReduceReplace(t *testing.T) { }, { replaces: map[string][][]interface{}{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES ": { + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ": { []interface{}{1, "1"}, []interface{}{2, "2"}, []interface{}{3, "3"}, @@ -897,7 +903,7 @@ func TestReduceReplace(t *testing.T) { batchSize: 10, sort: false, expectSQLs: []string{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?),(?,?),(?,?),(?,?),(?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?),(?,?),(?,?),(?,?),(?,?)", }, expectArgs: [][]interface{}{ {1, "1", 2, "2", 3, "3", 4, "3", 5, "5"}, @@ -905,7 +911,7 @@ func TestReduceReplace(t *testing.T) { }, { replaces: map[string][][]interface{}{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES ": { + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ": { []interface{}{1, "1"}, []interface{}{2, "2"}, []interface{}{3, "3"}, @@ -913,7 +919,7 @@ func TestReduceReplace(t *testing.T) { []interface{}{5, "5"}, []interface{}{6, "6"}, }, - "REPLACE INTO `test`.`t2`(`a`,`b`) VALUES ": { + "REPLACE INTO `test`.`t2` (`a`,`b`) VALUES ": { []interface{}{7, ""}, []interface{}{8, ""}, []interface{}{9, ""}, @@ -922,9 +928,9 @@ func TestReduceReplace(t *testing.T) { batchSize: 3, sort: true, expectSQLs: []string{ - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?),(?,?),(?,?)", - "REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?),(?,?),(?,?)", - "REPLACE INTO `test`.`t2`(`a`,`b`) VALUES (?,?),(?,?),(?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?),(?,?),(?,?)", + "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?),(?,?),(?,?)", + "REPLACE INTO `test`.`t2` (`a`,`b`) VALUES (?,?),(?,?),(?,?)", }, expectArgs: [][]interface{}{ {1, "1", 2, "2", 3, "3"}, @@ -1118,12 +1124,12 @@ func TestNewMySQLSinkExecDML(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). + mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) mock.ExpectCommit() mock.ExpectBegin() - mock.ExpectExec("INSERT INTO `s1`.`t2`(`a`,`b`) VALUES (?,?),(?,?)"). + mock.ExpectExec("INSERT INTO `s1`.`t2` (`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) mock.ExpectCommit() @@ -1330,7 +1336,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). + mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errDatabaseNotExists) mock.ExpectRollback() @@ -1356,7 +1362,12 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) { sinkURI, f, rc, map[string]string{}) require.Nil(t, err) - err = sink.execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) + txns := []*model.SingleTableTxn{ + { + Rows: rows, + }, + } + err = sink.execDMLs(ctx, txns, 1 /* replicaID */, 1 /* bucket */) require.Equal(t, errDatabaseNotExists, errors.Cause(err)) err = sink.Close(ctx) @@ -1408,7 +1419,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). + mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errTableNotExists) mock.ExpectRollback() @@ -1434,7 +1445,12 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) { sinkURI, f, rc, map[string]string{}) require.Nil(t, err) - err = sink.execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) + txns := []*model.SingleTableTxn{ + { + Rows: rows, + }, + } + err = sink.execDMLs(ctx, txns, 1 /* replicaID */, 1 /* bucket */) require.Equal(t, errTableNotExists, errors.Cause(err)) err = sink.Close(ctx) @@ -1487,7 +1503,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { require.Nil(t, err) for i := 0; i < int(defaultDMLMaxRetry); i++ { mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?),(?)"). + mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?),(?)"). WithArgs(1, 2). WillReturnError(errLockDeadlock) mock.ExpectRollback() @@ -1517,7 +1533,12 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { sinkURI, f, rc, map[string]string{}) require.Nil(t, err) - err = sink.execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */) + txns := []*model.SingleTableTxn{ + { + Rows: rows, + }, + } + err = sink.execDMLs(ctx, txns, 1 /* replicaID */, 1 /* bucket */) require.Equal(t, errLockDeadlock, errors.Cause(err)) err = sink.Close(ctx) @@ -1558,7 +1579,7 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`) VALUES (?)"). + mock.ExpectExec("INSERT INTO `s1`.`t1` (`a`) VALUES (?)"). WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit(). @@ -1588,7 +1609,12 @@ func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { ctx, model.DefaultChangeFeedID(changefeed), sinkURI, f, rc, map[string]string{}) require.Nil(t, err) - err = sink.execDMLs(ctx, rows, 0, 1 /* bucket */) + txns := []*model.SingleTableTxn{ + { + Rows: rows, + }, + } + err = sink.execDMLs(ctx, txns, 0, 1 /* bucket */) require.Equal(t, errDup, errors.Cause(err)) err = sink.Close(ctx) @@ -1871,12 +1897,12 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { // normal db db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`) VALUES (?)"). + mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`) VALUES (?)"). WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t2`(`a`) VALUES (?)"). + mock.ExpectExec("REPLACE INTO `s1`.`t2` (`a`) VALUES (?)"). WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() @@ -2075,7 +2101,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?)"). + mock.ExpectExec("REPLACE INTO `s1`.`t1` (`a`,`b`) VALUES (?,?)"). WillDelayFor(1 * time.Second). WillReturnError(&dmysql.MySQLError{Number: mysql.ErrNoSuchTable}) return db, nil @@ -2493,7 +2519,487 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { ms.params.safeMode = false ms.params.enableOldValue = true for _, tc := range testCases { - dmls := ms.prepareDMLs(tc.input, 0, 0) + txns := []*model.SingleTableTxn{ + { + Rows: tc.input, + }, + } + dmls := ms.prepareDMLs(txns, 0, 0) require.Equal(t, tc.expected, dmls, tc.name) } } + +func TestPrepareBatchDMLs(t *testing.T) { + t.Parallel() + testCases := []struct { + input []*model.RowChangedEvent + expected *preparedDMLs + }{ + // empty event + { + input: []*model.RowChangedEvent{}, + expected: &preparedDMLs{ + sqls: []string{}, + values: [][]interface{}{}, + }, + }, + { // delete event + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))"}, + values: [][]interface{}{{1, 1, 2, 2}}, + rowCount: 2, + }, + }, + { // insert event + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 1}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?),(?,?)"}, + values: [][]interface{}{{1, 1, 2, 2}}, + rowCount: 2, + }, + }, + // update event + { + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 3, + }}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 4, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 4, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"UPDATE `common_1`.`uk_without_pk` SET `a1`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END, `a3`=CASE WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? WHEN ROW(`a1`,`a3`)=ROW(?,?) THEN ? END WHERE ROW(`a1`,`a3`) IN (ROW(?,?),ROW(?,?))"}, + values: [][]interface{}{{1, 1, 2, 3, 3, 4, 1, 1, 2, 3, 3, 4, 1, 1, 3, 3}}, + rowCount: 2, + }, + }, + // mixed event + { + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{ + "DELETE FROM `common_1`.`uk_without_pk` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", + "INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)", + }, + values: [][]interface{}{{1, 1, 2, 2}, {2, 2}}, + rowCount: 3, + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ms := newMySQLSink4Test(ctx, t) + ms.params.batchDMLEnable = true + ms.params.safeMode = false + ms.params.enableOldValue = true + for _, tc := range testCases { + txns := []*model.SingleTableTxn{{Rows: tc.input}} + dmls := ms.prepareDMLs(txns, 1, 1) + require.Equal(t, tc.expected, dmls) + } +} + +func TestGroupRowsByType(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ms := newMySQLSink4Test(ctx, t) + testCases := []struct { + input []*model.RowChangedEvent + maxTxnRow int + }{ + { // delete event + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.UniqueKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{1, 2}}, + }, + }, + maxTxnRow: 2, + }, + { // insert event + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }}, + IndexColumns: [][]int{{1, 1}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | + model.HandleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + IndexColumns: [][]int{{2, 2}}, + }, + }, + maxTxnRow: 4, + }, + } + for _, tc := range testCases { + colums := tc.input[0].Columns + if len(colums) == 0 { + colums = tc.input[0].PreColumns + } + tableInfo := model.BuildTiDBTableInfo(colums, tc.input[0].IndexColumns) + ms.params.maxTxnRow = tc.maxTxnRow + inserts, updates, deletes := ms.groupRowsByType(tc.input, tableInfo, false) + for _, rows := range inserts { + require.LessOrEqual(t, len(rows), tc.maxTxnRow) + } + for _, rows := range updates { + require.LessOrEqual(t, len(rows), tc.maxTxnRow) + } + for _, rows := range deletes { + require.LessOrEqual(t, len(rows), tc.maxTxnRow) + } + } +} diff --git a/cdc/sink/mysql/mysql_worker.go b/cdc/sink/mysql/mysql_worker.go index 34500f13a3b..b501d8a3876 100644 --- a/cdc/sink/mysql/mysql_worker.go +++ b/cdc/sink/mysql/mysql_worker.go @@ -32,7 +32,7 @@ type mysqlSinkWorker struct { txnCh chan *model.SingleTableTxn maxTxnRow int bucket int - execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error + execDMLs func(context.Context, []*model.SingleTableTxn, uint64, int) error metricBucketSize prometheus.Counter receiver *notify.Receiver closedCh chan struct{} @@ -44,7 +44,7 @@ func newMySQLSinkWorker( bucket int, metricBucketSize prometheus.Counter, receiver *notify.Receiver, - execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error, + execDMLs func(context.Context, []*model.SingleTableTxn, uint64, int) error, ) *mysqlSinkWorker { return &mysqlSinkWorker{ txnCh: make(chan *model.SingleTableTxn, 1024), @@ -82,9 +82,10 @@ func (w *mysqlSinkWorker) isNormal() bool { func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { var ( - toExecRows []*model.RowChangedEvent + toExecTxns []*model.SingleTableTxn replicaID uint64 txnNum int + rowNum int ) // mark FinishWg before worker exits, all data txns can be omitted. @@ -112,17 +113,18 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { }() flushRows := func() error { - if len(toExecRows) == 0 { + if len(toExecTxns) == 0 { return nil } - err := w.execDMLs(ctx, toExecRows, replicaID, w.bucket) + err := w.execDMLs(ctx, toExecTxns, replicaID, w.bucket) if err != nil { txnNum = 0 return err } - toExecRows = toExecRows[:0] + toExecTxns = toExecTxns[:0] w.metricBucketSize.Add(float64(txnNum)) txnNum = 0 + rowNum = 0 return nil } @@ -143,7 +145,7 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { txn.FinishWg.Done() continue } - if txn.ReplicaID != replicaID || len(toExecRows)+len(txn.Rows) > w.maxTxnRow { + if txn.ReplicaID != replicaID || rowNum+len(txn.Rows) > w.maxTxnRow { if err := flushRows(); err != nil { w.hasError.Store(true) txnNum++ @@ -151,7 +153,8 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { } } replicaID = txn.ReplicaID - toExecRows = append(toExecRows, txn.Rows...) + toExecTxns = append(toExecTxns, txn) + rowNum += len(txn.Rows) txnNum++ case <-w.receiver.C: if err := flushRows(); err != nil { diff --git a/cdc/sink/mysql/mysql_worker_test.go b/cdc/sink/mysql/mysql_worker_test.go index 50a1d95934b..cbd7b89dd52 100644 --- a/cdc/sink/mysql/mysql_worker_test.go +++ b/cdc/sink/mysql/mysql_worker_test.go @@ -181,9 +181,13 @@ func TestMysqlSinkWorker(t *testing.T) { metrics.BucketSizeCounter. WithLabelValues("default", "changefeed", "1"), receiver, - func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { - rows := make([]*model.RowChangedEvent, len(events)) - copy(rows, events) + func(ctx context.Context, events []*model.SingleTableTxn, replicaID uint64, bucket int) error { + rows := make([]*model.RowChangedEvent, 0) + for _, event := range events { + if len(event.Rows) != 0 { + rows = append(rows, event.Rows...) + } + } outputRows = append(outputRows, rows) outputReplicaIDs = append(outputReplicaIDs, replicaID) return nil @@ -264,7 +268,7 @@ func TestMySQLSinkWorkerExitWithError(t *testing.T) { metrics. BucketSizeCounter.WithLabelValues("default", "changefeed", "1"), receiver, - func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { + func(ctx context.Context, events []*model.SingleTableTxn, replicaID uint64, bucket int) error { return errExecFailed }) errg, cctx := errgroup.WithContext(cctx) @@ -341,7 +345,7 @@ func TestMySQLSinkWorkerExitCleanup(t *testing.T) { metrics. BucketSizeCounter.WithLabelValues("default", "changefeed", "1"), receiver, - func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { + func(ctx context.Context, events []*model.SingleTableTxn, replicaID uint64, bucket int) error { return errExecFailed }) errg, cctx := errgroup.WithContext(cctx) diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 6a24682377f..1329f63c1fb 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -156,16 +156,16 @@ func TestApplyDMLs(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?)"). + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(1, "2"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() mock.ExpectBegin() - mock.ExpectExec("DELETE FROM `test`.`t1` WHERE `a` = ? LIMIT 1;"). + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`) IN ((?))"). WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec("REPLACE INTO `test`.`t1`(`a`,`b`) VALUES (?,?)"). + mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(2, "3"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() @@ -198,6 +198,7 @@ func TestApplyDMLs(t *testing.T) { Flag: 0, }, }, + IndexColumns: [][]int{{0}}, }, { StartTs: 1200, @@ -207,7 +208,7 @@ func TestApplyDMLs(t *testing.T) { { Name: "a", Value: 1, - Flag: model.HandleKeyFlag, + Flag: model.HandleKeyFlag | model.UniqueKeyFlag | model.PrimaryKeyFlag, }, { Name: "b", Value: "2", @@ -218,13 +219,14 @@ func TestApplyDMLs(t *testing.T) { { Name: "a", Value: 2, - Flag: model.HandleKeyFlag, + Flag: model.HandleKeyFlag | model.UniqueKeyFlag | model.PrimaryKeyFlag, }, { Name: "b", Value: "3", Flag: 0, }, }, + IndexColumns: [][]int{{0}}, }, } for _, dml := range dmls { diff --git a/pkg/sqlmodel/multivalue.go b/pkg/sqlmodel/multivalue.go index 8c77387b276..b1dc05ab381 100644 --- a/pkg/sqlmodel/multivalue.go +++ b/pkg/sqlmodel/multivalue.go @@ -16,6 +16,12 @@ package sqlmodel import ( "strings" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/format" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/opcode" + driver "github.com/pingcap/tidb/types/parser_driver" + "go.uber.org/zap" "github.com/pingcap/tiflow/dm/pkg/log" @@ -200,3 +206,143 @@ func GenInsertSQL(tp DMLType, changes ...*RowChange) (string, []interface{}) { } return buf.String(), args } + +// GenUpdateSQL generates the UPDATE SQL and its arguments. +// Input `changes` should have same target table and same columns for WHERE +// (typically same PK/NOT NULL UK), otherwise the behaviour is undefined. +// Compared to GenInsertSQL with DMLInsertOnDuplicateUpdate, this function is +// slower and more complex, we should only use it when PK/UK is updated. +func GenUpdateSQL(changes ...*RowChange) (string, []interface{}) { + if len(changes) == 0 { + log.L().DPanic("row changes is empty") + return "", nil + } + + stmt := &ast.UpdateStmt{} + first := changes[0] + + // handle UPDATE db.tbl ... + + t := &ast.TableName{ + Schema: model.NewCIStr(first.targetTable.Schema), + Name: model.NewCIStr(first.targetTable.Table), + } + stmt.TableRefs = &ast.TableRefsClause{TableRefs: &ast.Join{Left: &ast.TableSource{Source: t}}} + + // handle ... SET col... , col2... , ... + + stmt.List = make([]*ast.Assignment, 0, len(first.sourceTableInfo.Columns)) + var skipColIdx []int + + whereColumns, _ := first.whereColumnsAndValues() + var ( + whereColumnsExpr ast.ExprNode + whereValuesExpr ast.ExprNode + ) + // row constructor does not support only one value. + if len(whereColumns) == 1 { + whereColumnsExpr = &ast.ColumnNameExpr{ + Name: &ast.ColumnName{Name: model.NewCIStr(whereColumns[0])}, + } + whereValuesExpr = &driver.ParamMarkerExpr{} + } else { + e := &ast.RowExpr{Values: make([]ast.ExprNode, 0, len(whereColumns))} + for _, col := range whereColumns { + e.Values = append(e.Values, &ast.ColumnNameExpr{ + Name: &ast.ColumnName{Name: model.NewCIStr(col)}, + }) + } + whereColumnsExpr = e + + e2 := &ast.RowExpr{Values: make([]ast.ExprNode, 0, len(whereColumns))} + for range whereColumns { + e2.Values = append(e2.Values, &driver.ParamMarkerExpr{}) + } + whereValuesExpr = e2 + } + + // WHEN (c1, c2) = (?, ?) THEN ? + whenCommon := &ast.WhenClause{ + Expr: &ast.BinaryOperationExpr{ + Op: opcode.EQ, + L: whereColumnsExpr, + R: whereValuesExpr, + }, + Result: &driver.ParamMarkerExpr{}, + } + // each row change should generate one WHEN case, identified by PK/UK + allWhenCases := make([]*ast.WhenClause, len(changes)) + for i := range allWhenCases { + allWhenCases[i] = whenCommon + } + for i, col := range first.sourceTableInfo.Columns { + if isGenerated(first.targetTableInfo.Columns, col.Name) { + skipColIdx = append(skipColIdx, i) + continue + } + + assign := &ast.Assignment{Column: &ast.ColumnName{Name: col.Name}} + assign.Expr = &ast.CaseExpr{WhenClauses: allWhenCases} + stmt.List = append(stmt.List, assign) + } + + // handle ... WHERE IN ... + + where := &ast.PatternInExpr{Expr: whereColumnsExpr} + stmt.Where = where + // every row change has a where case + where.List = make([]ast.ExprNode, len(changes)) + for i := range where.List { + where.List[i] = whereValuesExpr + } + + // now build args of the UPDATE SQL + + args := make([]interface{}, 0, len(stmt.List)*len(changes)*(len(whereColumns)+1)+len(changes)*len(whereColumns)) + argsPerCol := make([][]interface{}, len(stmt.List)) + for i := range stmt.List { + argsPerCol[i] = make([]interface{}, 0, len(changes)*(len(whereColumns)+1)) + } + whereValuesAtTheEnd := make([]interface{}, 0, len(changes)*len(whereColumns)) + for _, change := range changes { + _, whereValues := change.whereColumnsAndValues() + // a simple check about different number of WHERE values, not trying to + // cover all cases + if len(whereValues) != len(whereColumns) { + log.L().DPanic("len(whereValues) != len(whereColumns)", + zap.Int("len(whereValues)", len(whereValues)), + zap.Int("len(whereColumns)", len(whereColumns)), + zap.Any("whereValues", whereValues), + zap.Stringer("sourceTable", change.sourceTable)) + return "", nil + } + + whereValuesAtTheEnd = append(whereValuesAtTheEnd, whereValues...) + + i := 0 // used as index of skipColIdx + writeableCol := 0 + for j, val := range change.postValues { + if i < len(skipColIdx) && skipColIdx[i] == j { + i++ + continue + } + argsPerCol[writeableCol] = append(argsPerCol[writeableCol], whereValues...) + argsPerCol[writeableCol] = append(argsPerCol[writeableCol], val) + writeableCol++ + } + } + for _, a := range argsPerCol { + args = append(args, a...) + } + args = append(args, whereValuesAtTheEnd...) + + var buf strings.Builder + restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &buf) + if err := stmt.Restore(restoreCtx); err != nil { + log.L().DPanic("failed to generate multi-row UPDATE", + zap.Int("numberOfChanges", len(changes)), + zap.Error(err)) + return "", nil + } + return buf.String(), args +} diff --git a/tests/integration_tests/_utils/run_kafka_consumer b/tests/integration_tests/_utils/run_kafka_consumer index 29ec63d5e4b..f246ce5ed48 100755 --- a/tests/integration_tests/_utils/run_kafka_consumer +++ b/tests/integration_tests/_utils/run_kafka_consumer @@ -12,15 +12,16 @@ sink_uri=$2 consumer_replica_config=$3 log_suffix=$4 pwd=$pwd +downstream_uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" echo "[$(date)] <<<<<< START kafka consumer in $TEST_NAME case >>>>>>" cd $workdir # some consumer may require `consumer_replica_config`, set it separately if [ "$consumer_replica_config" != "" ]; then echo "consumer replica config found: $consumer_replica_config" - cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/?safe-mode=true --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & + cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri ${downstream_uri} --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & else - cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/?safe-mode=true >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & + cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri ${downstream_uri} >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & fi cd $pwd diff --git a/tests/integration_tests/batch_dml/conf/diff_config.toml b/tests/integration_tests/batch_dml/conf/diff_config.toml new file mode 100644 index 00000000000..8ca7f377037 --- /dev/null +++ b/tests/integration_tests/batch_dml/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] +output-dir = "/tmp/tidb_cdc_test/batch_update_to_no_batch/sync_diff/output" + +source-instances = ["mysql1"] + +target-instance = "tidb0" + +target-check-tables = ["batch_update_to_no_batch.?*"] + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 4000 +user = "root" +password = "" + +[data-sources.tidb0] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "" diff --git a/tests/integration_tests/batch_dml/data/test.sql b/tests/integration_tests/batch_dml/data/test.sql new file mode 100644 index 00000000000..28545dfcdd7 --- /dev/null +++ b/tests/integration_tests/batch_dml/data/test.sql @@ -0,0 +1,177 @@ +drop database if exists `batch_update_to_no_batch`; +create database `batch_update_to_no_batch`; +use `batch_update_to_no_batch`; + +-- multi data type test + +CREATE TABLE cdc_multi_data_type +( + id INT AUTO_INCREMENT, + t_boolean BOOLEAN, + t_bigint BIGINT, + t_double DOUBLE, + t_decimal DECIMAL(38, 19), + t_bit BIT(64), + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + t_time TIME, + t_year YEAR, + t_char CHAR, + t_varchar VARCHAR(10), + t_blob BLOB, + t_text TEXT, + t_enum ENUM ('enum1', 'enum2', 'enum3'), + t_set SET ('a', 'b', 'c'), + t_json JSON, + PRIMARY KEY (id) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8 + COLLATE = utf8_bin; + +-- make sure `nullable` can be handled by the mounter and mq encoding protocol +INSERT INTO cdc_multi_data_type() VALUES (); + +INSERT INTO cdc_multi_data_type( t_boolean, t_bigint, t_double, t_decimal, t_bit + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_char, t_varchar, t_blob, t_text, t_enum + , t_set, t_json) +VALUES ( true, 9223372036854775807, 123.123, 123456789012.123456789012, b'1000001' + , '1000-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59', 1970 + , '测', '测试', 'blob', '测试text', 'enum2' + , 'a,b', NULL); + +INSERT INTO cdc_multi_data_type( t_boolean, t_bigint, t_double, t_decimal, t_bit + , t_date, t_datetime, t_timestamp, t_time, t_year + , t_char, t_varchar, t_blob, t_text, t_enum + , t_set, t_json) +VALUES ( true, 9223372036854775807, 678, 321, b'1000001' + , '1000-01-01', '9999-12-31 23:59:59', '19731230153000', '23:59:59', 1970 + , '测', '测试', 'blob', '测试text', 'enum2' + , 'a,b', NULL); + +INSERT INTO cdc_multi_data_type(t_boolean) +VALUES (TRUE); + +INSERT INTO cdc_multi_data_type(t_boolean) +VALUES (FALSE); + +INSERT INTO cdc_multi_data_type(t_bigint) +VALUES (-9223372036854775808); + +INSERT INTO cdc_multi_data_type(t_bigint) +VALUES (9223372036854775807); + +INSERT INTO cdc_multi_data_type(t_json) +VALUES ('{ + "key1": "value1", + "key2": "value2" +}'); + +-- view test + +CREATE TABLE t1 +( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + c1 INT NOT NULL +); + +INSERT INTO t1 (c1) +VALUES (1), + (2), + (3), + (4), + (5); + +CREATE VIEW v1 AS +SELECT * +FROM t1 +WHERE c1 > 2; + +-- uk without pk +-- https://internal.pingcap.net/jira/browse/TOOL-714 +-- CDC don't support UK is null + +CREATE TABLE uk_without_pk +( + id INT, + a1 INT NOT NULL, + a3 INT NOT NULL, + UNIQUE KEY dex1 (a1, a3) +); + +INSERT INTO uk_without_pk(id, a1, a3) +VALUES (1, 1, 2); + +INSERT INTO uk_without_pk(id, a1, a3) +VALUES (2, 1, 1); + +UPDATE uk_without_pk +SET id = 10, + a1 = 2 +WHERE a1 = 1; + +UPDATE uk_without_pk +SET id = 100 +WHERE a1 = 10; + +UPDATE uk_without_pk +SET a3 = 4 +WHERE a3 = 1; + +-- bit column +-- Test issue: TOOL-1346 + +CREATE TABLE binlog_insert_bit +( + a BIT(1) PRIMARY KEY, + b BIT(64) +); + +INSERT INTO binlog_insert_bit +VALUES (0x01, 0xffffffff); + +UPDATE binlog_insert_bit +SET a = 0x00, + b = 0xfffffffe; + +-- recover test +-- Test issue: TOOL-1407 +CREATE TABLE recover_and_insert +( + id INT PRIMARY KEY, + a INT +); + +INSERT INTO recover_and_insert(id, a) +VALUES (1, -1); + +UPDATE recover_and_insert +SET a = -5 +WHERE id = 1; + +DROP TABLE recover_and_insert; + +RECOVER TABLE recover_and_insert; + +-- make sure we can insert data after recovery +INSERT INTO recover_and_insert(id, a) +VALUES (2, -3); + +-- column null test + +CREATE TABLE `column_is_null` +( + `id` int(11) NOT NULL, + `t` datetime DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4 + COLLATE = utf8mb4_bin; + +INSERT INTO `column_is_null`(id) +VALUES (1), + (2); +UPDATE `column_is_null` +SET t = NULL +WHERE id = 1; diff --git a/tests/integration_tests/batch_dml/data/test_finish.sql b/tests/integration_tests/batch_dml/data/test_finish.sql new file mode 100644 index 00000000000..b375cb6f8f7 --- /dev/null +++ b/tests/integration_tests/batch_dml/data/test_finish.sql @@ -0,0 +1,7 @@ +-- mark finish table +USE `batch_update_to_no_batch`; + +CREATE TABLE finish_mark +( + a int primary key +); diff --git a/tests/integration_tests/batch_dml/data/test_v5.sql b/tests/integration_tests/batch_dml/data/test_v5.sql new file mode 100644 index 00000000000..1d1cc273083 --- /dev/null +++ b/tests/integration_tests/batch_dml/data/test_v5.sql @@ -0,0 +1,27 @@ +-- test add and drop columns +USE `batch_update_to_no_batch`; + +CREATE TABLE `add_and_drop_columns` +( + `id` int(11) NOT NULL PRIMARY KEY +); + +insert into `add_and_drop_columns` (id) +values (1); + +alter table `add_and_drop_columns` + add col1 int null, + add col2 int null, + add col3 int null; + +insert into `add_and_drop_columns` (id, col1, col2, col3) +values (2, 3, 4, 5); + +insert into `add_and_drop_columns` (id) values (3); + +alter table `add_and_drop_columns` +drop col1, + drop col2; + +insert into `add_and_drop_columns` (id, col3) +values (4, 5); diff --git a/tests/integration_tests/batch_dml/run.sh b/tests/integration_tests/batch_dml/run.sh new file mode 100644 index 00000000000..dccb450c0cb --- /dev/null +++ b/tests/integration_tests/batch_dml/run.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +# This integration test is used to test the following scenario: +# 1. cdc works well in batch mode +# 2. cdc works well in no-batch mode +# 3. cdc can switch from batch mode to no-batch mode and vice versa and works well +function run() { + # batch mode only supports mysql sink + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + run_sql "set global tidb_enable_change_multi_schema = on" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # This must be set before cdc server starts + run_sql "set global tidb_enable_change_multi_schema = on" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + # TiDB global variables cache 2 seconds at most + sleep 2 + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + # this test contains `recover table`, which requires super privilege, so we + # can't use the normal user + SINK_URI="mysql://root@127.0.0.1:3306/?batch-dml-enable=true" + + changefeed_id="test" + run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c ${changefeed_id} + + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # pause changefeed + run_cdc_cli changefeed pause -c ${changefeed_id} + # update changefeed to no batch dml mode + run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="mysql://root@127.0.0.1:3306/?batch-dml-enable=true" --no-confirm + # resume changefeed + run_cdc_cli changefeed resume -c ${changefeed_id} + + run_sql_file $CUR/data/test_v5.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # pause changefeed + run_cdc_cli changefeed pause -c ${changefeed_id} + # update changefeed to no batch dml mode + run_cdc_cli changefeed update -c ${changefeed_id} --sink-uri="mysql://root@127.0.0.1:3306/?batch-dml-enable=false" --no-confirm + # resume changefeed + run_cdc_cli changefeed resume -c ${changefeed_id} + + run_sql_file $CUR/data/test_finish.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists batch_update_to_no_batch.v1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists batch_update_to_no_batch.recover_and_insert ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists batch_update_to_no_batch.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"