Skip to content

Commit

Permalink
mysql (ticdc): back port batch dml to release 6.1 (#7857)
Browse files Browse the repository at this point in the history
ref #7653
  • Loading branch information
asddongmen authored Jan 10, 2023
1 parent ee2a385 commit 8e90b44
Show file tree
Hide file tree
Showing 16 changed files with 1,628 additions and 114 deletions.
181 changes: 181 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@
package entry

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/pkg/sqlmodel"

"github.com/pingcap/log"
ticonfig "github.com/pingcap/tidb/config"
tidbkv "github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -961,3 +970,175 @@ func TestGetDefaultZeroValue(t *testing.T) {
require.Equal(t, tc.Default, val, tc.Name)
}
}

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
origin string
recovered string
recoveredWithNilCol string
}{
{
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
" c3 BIT(10) NOT NULL," +
" UNIQUE KEY (c2, c3)" +
")",
// CDC discards field length.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned DEFAULT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
" c2 VARCHAR(10) NOT NULL," +
" gen2 INT AS (c+2) STORED," +
" c3 BIT(10) NOT NULL," +
" PRIMARY KEY (c, c2)" +
")",
// CDC discards virtual generated column, and generating expression of stored generated column.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" +
" `c3` bit(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE `t1` (" +
" `a` int(11) NOT NULL," +
" `b` int(11) DEFAULT NULL," +
" `c` int(11) DEFAULT NULL," +
" PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */," +
" UNIQUE KEY `b` (`b`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `b` int(0) DEFAULT NULL,\n" +
" `c` int(0) DEFAULT NULL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */,\n" +
" UNIQUE KEY `idx_1` (`b`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `a` int(0) NOT NULL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" `omitted` unspecified CHARACTER SET COLLATE GENERATED ALWAYS AS (pass_generated_check) VIRTUAL,\n" +
" PRIMARY KEY (`a`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))

// mimic the columns are set to nil when old value feature is disabled
for i := range cols {
if !cols[i].Flag.IsHandleKey() {
cols[i] = nil
}
}
recoveredTI = model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle = sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
require.NotNil(t, handle.UniqueNotNullIdx)
require.Equal(t, c.recoveredWithNilCol, showCreateTable(t, recoveredTI))
}
}

var tiCtx = mock.NewContext()

func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
result := bytes.NewBuffer(make([]byte, 0, 512))
err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result)
require.NoError(t, err)
return result.String()
}

func TestNewDMRowChange(t *testing.T) {
cases := []struct {
origin string
recovered string
}{
{
"CREATE TABLE t1 (id INT," +
" a1 INT NOT NULL," +
" a3 INT NOT NULL," +
" UNIQUE KEY dex1(a1, a3));",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `id` int(0) DEFAULT NULL,\n" +
" `a1` int(0) NOT NULL,\n" +
" `a3` int(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`a1`(0),`a3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
{
Name: "id", Type: 3, Charset: "binary", Flag: 65, Value: 1, Default: nil,
},
{
Name: "a1", Type: 3, Charset: "binary", Flag: 51, Value: 1, Default: nil,
},
{
Name: "a3", Type: 3, Charset: "binary", Flag: 51, Value: 2, Default: nil,
},
}
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
tableName := &model.TableName{Schema: "db", Table: "t1"}
rowChange := sqlmodel.NewRowChange(tableName, nil, []interface{}{1, 1, 2}, nil, recoveredTI, nil, nil)
sqlGot, argsGot := rowChange.GenSQL(sqlmodel.DMLDelete)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE `a1` = ? AND `a3` = ? LIMIT 1", sqlGot)
require.Equal(t, []interface{}{1, 2}, argsGot)

sqlGot, argsGot = sqlmodel.GenDeleteSQL(rowChange, rowChange)
require.Equal(t, "DELETE FROM `db`.`t1` WHERE (`a1`,`a3`) IN ((?,?),(?,?))", sqlGot)
require.Equal(t, []interface{}{1, 2, 1, 2}, argsGot)
}
}
100 changes: 100 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"
"unsafe"

"github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -438,6 +440,104 @@ type RedoColumn struct {
Flag uint64 `msg:"flag"`
}

// BuildTiDBTableInfo builds a TiDB TableInfo from given information.
func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo {
ret := &model.TableInfo{}
// nowhere will use this field, so we set a debug message
ret.Name = model.NewCIStr("BuildTiDBTableInfo")

for i, col := range columns {
columnInfo := &model.ColumnInfo{
Offset: i,
State: model.StatePublic,
}
if col == nil {
// by referring to datum2Column, nil is happened when
// - !IsColCDCVisible, which means the column is a virtual generated
// column
// - !exist && !fillWithDefaultValue, which means upstream does not
// send the column value
// just mock for the first case
columnInfo.Name = model.NewCIStr("omitted")
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = false
ret.Columns = append(ret.Columns, columnInfo)
continue
}
columnInfo.Name = model.NewCIStr(col.Name)
columnInfo.SetType(col.Type)
// TiKV always use utf8mb4 to store, and collation is not recorded by CDC
columnInfo.SetCharset(mysql.UTF8MB4Charset)
columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation)

// inverse initColumnsFlag
flag := col.Flag
if flag.IsBinary() {
columnInfo.SetCharset("binary")
}
if flag.IsGeneratedColumn() {
// we do not use this field, so we set it to any non-empty string
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = true
}
if flag.IsHandleKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
ret.IsCommonHandle = true
} else if flag.IsPrimaryKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
if flag.IsUniqueKey() {
columnInfo.AddFlag(mysql.UniqueKeyFlag)
}
if !flag.IsNullable() {
columnInfo.AddFlag(mysql.NotNullFlag)
}
if flag.IsMultipleKey() {
columnInfo.AddFlag(mysql.MultipleKeyFlag)
}
if flag.IsUnsigned() {
columnInfo.AddFlag(mysql.UnsignedFlag)
}
ret.Columns = append(ret.Columns, columnInfo)
}

for i, colOffsets := range indexColumns {
indexInfo := &model.IndexInfo{
Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)),
State: model.StatePublic,
}
firstCol := columns[colOffsets[0]]
if firstCol == nil {
// when the referenced column is nil, we already have a handle index,
// so we can skip this index.
// only happens for DELETE event and old value feature is disabled
continue
}
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
indexInfo.Unique = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
}

for _, offset := range colOffsets {
col := ret.Columns[offset]

indexCol := &model.IndexColumn{}
indexCol.Name = col.Name
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
}

// TODO: revert the "all column set index related flag" to "only the
// first column set index related flag" if needed

ret.Indices = append(ret.Indices, indexInfo)
}
return ret
}

// ColumnValueString returns the string representation of the column value
func ColumnValueString(c interface{}) string {
var data string
Expand Down
Loading

0 comments on commit 8e90b44

Please sign in to comment.