Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql (ticdc): back port batch dml to release 6.1 #7857

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@
package entry

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/pkg/sqlmodel"

"github.com/pingcap/log"
ticonfig "github.com/pingcap/tidb/config"
tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -961,3 +970,175 @@ func TestGetDefaultZeroValue(t *testing.T) {
require.Equal(t, tc.Default, val, tc.Name)
}
}

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
origin string
recovered string
recoveredWithNilCol string
}{
{
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
" c3 BIT(10) NOT NULL," +
" UNIQUE KEY (c2, c3)" +
")",
// CDC discards field length.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned DEFAULT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
" c2 VARCHAR(10) NOT NULL," +
" gen2 INT AS (c+2) STORED," +
" c3 BIT(10) NOT NULL," +
" PRIMARY KEY (c, c2)" +
")",
// CDC discards virtual generated column, and generating expression of stored generated column.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" +
" `c3` bit(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE `t1` (" +
" `a` int(11) NOT NULL," +
" `b` int(11) DEFAULT NULL," +
" `c` int(11) DEFAULT NULL," +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */," +
" UNIQUE KEY `b` (`b`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `b` int(0) DEFAULT NULL,\n" +
" `c` int(0) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`b`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))

// mimic the columns are set to nil when old value feature is disabled
for i := range cols {
if !cols[i].Flag.IsHandleKey() {
cols[i] = nil
}
}
recoveredTI = model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle = sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recoveredWithNilCol, showCreateTable(t, recoveredTI))
}
}

var tiCtx = mock.NewContext()

func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
result := bytes.NewBuffer(make([]byte, 0, 512))
err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result)
require.NoError(t, err)
return result.String()
}

func TestNewDMRowChange(t *testing.T) {
cases := []struct {
origin string
recovered string
}{
{
"CREATE TABLE t1 (id INT," +
" a1 INT NOT NULL," +
" a3 INT NOT NULL," +
" UNIQUE KEY dex1(a1, a3));",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) DEFAULT NULL,\n" +
" `a1` int(0) NOT NULL,\n" +
" `a3` int(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`a1`(0),`a3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
},
}
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
tableName := &model.TableName{Schema: "db", Table: "t1"}
rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, recoveredTI, nil, nil)
sqlGot, argsGot := rowChange.GenSQL(sqlmodel.DMLDelete)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE `a1` = ? AND `a3` = ? LIMIT 1", sqlGot)
require.Equal(t, []interface{}{1, 2}, argsGot)

sqlGot, argsGot = sqlmodel.GenDeleteSQL(rowChange, rowChange)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", sqlGot)
require.Equal(t, []interface{}{1, 2, 1, 2}, argsGot)
}
}
100 changes: 100 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"
"unsafe"

"github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -438,6 +440,104 @@ type RedoColumn struct {
Flag uint64 `msg:"flag"`
}

// BuildTiDBTableInfo builds a TiDB TableInfo from given information.
func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo {
ret := &model.TableInfo{}
// nowhere will use this field, so we set a debug message
ret.Name = model.NewCIStr("BuildTiDBTableInfo")

for i, col := range columns {
columnInfo := &model.ColumnInfo{
Offset: i,
State: model.StatePublic,
}
if col == nil {
// by referring to datum2Column, nil is happened when
// - !IsColCDCVisible, which means the column is a virtual generated
// column
// - !exist && !fillWithDefaultValue, which means upstream does not
// send the column value
// just mock for the first case
columnInfo.Name = model.NewCIStr("omitted")
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = false
ret.Columns = append(ret.Columns, columnInfo)
continue
}
columnInfo.Name = model.NewCIStr(col.Name)
columnInfo.SetType(col.Type)
// TiKV always use utf8mb4 to store, and collation is not recorded by CDC
columnInfo.SetCharset(mysql.UTF8MB4Charset)
columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation)

// inverse initColumnsFlag
flag := col.Flag
if flag.IsBinary() {
columnInfo.SetCharset("binary")
}
if flag.IsGeneratedColumn() {
// we do not use this field, so we set it to any non-empty string
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = true
}
if flag.IsHandleKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
ret.IsCommonHandle = true
} else if flag.IsPrimaryKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
if flag.IsUniqueKey() {
columnInfo.AddFlag(mysql.UniqueKeyFlag)
}
if !flag.IsNullable() {
columnInfo.AddFlag(mysql.NotNullFlag)
}
if flag.IsMultipleKey() {
columnInfo.AddFlag(mysql.MultipleKeyFlag)
}
if flag.IsUnsigned() {
columnInfo.AddFlag(mysql.UnsignedFlag)
}
ret.Columns = append(ret.Columns, columnInfo)
}

for i, colOffsets := range indexColumns {
indexInfo := &model.IndexInfo{
Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)),
State: model.StatePublic,
}
firstCol := columns[colOffsets[0]]
if firstCol == nil {
// when the referenced column is nil, we already have a handle index,
// so we can skip this index.
// only happens for DELETE event and old value feature is disabled
continue
}
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
indexInfo.Unique = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
}

for _, offset := range colOffsets {
col := ret.Columns[offset]

indexCol := &model.IndexColumn{}
indexCol.Name = col.Name
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
}

// TODO: revert the "all column set index related flag" to "only the
// first column set index related flag" if needed

ret.Indices = append(ret.Indices, indexInfo)
}
return ret
}

// ColumnValueString returns the string representation of the column value
func ColumnValueString(c interface{}) string {
var data string
Expand Down
Loading