Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc/owner: Fix ddl special comment syntax error #3845

Merged
merged 14 commits into from
Dec 20, 2021
Merged
34 changes: 32 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package owner

import (
"context"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
schedulerv2 "github.com/pingcap/tiflow/cdc/scheduler"
Expand Down Expand Up @@ -482,7 +484,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
if err != nil {
return false, errors.Trace(err)
}
ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query)
ddlEvent.Query, err = addSpecialComment(ddlEvent.Query)
if err != nil {
return false, errors.Trace(err)
}

c.ddlEventCache = ddlEvent
if c.redoManager.Enabled() {
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
Expand Down Expand Up @@ -534,3 +540,27 @@ func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
}
return nil
}

// addSpecialComment translate tidb feature to comment
func addSpecialComment(ddlQuery string) (string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are special comments compatible with older version of TiDB (v4.0.0)?

stms, _, err := parser.New().ParseSQL(ddlQuery)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery))
}
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}
145 changes: 143 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create database test1")
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE `test1`")

// executing the ddl finished
mockDDLSink.ddlDone = true
Expand All @@ -295,7 +295,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)")
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "CREATE TABLE `test1`.`test1` (`id` INT PRIMARY KEY)")

// executing the ddl finished
mockDDLSink.ddlDone = true
Expand Down Expand Up @@ -431,3 +431,144 @@ func testChangefeedReleaseResource(
_, err = os.Stat(redoLogDir)
c.Assert(os.IsNotExist(err), check.IsTrue)
}

func (s *changefeedSuite) TestAddSpecialComment(c *check.C) {
defer testleak.AfterTest(c)()
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int primary key auto_random(2));",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)",
},
{
"create table t1 (id int primary key auto_random);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */",
},
{
"create table t1 (id int auto_random primary key) auto_random_base = 50;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */",
},
{
"create table t1 (id int auto_increment key) auto_id_cache 100;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */",
},
{
"create table t1 (id int auto_increment unique) auto_id_cache 10;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */",
},
{
"create table t1 (id int) auto_id_cache = 5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) auto_id_cache=5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int, v int, primary key(a) clustered);",
"CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int primary key clustered, v int);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT)",
},
{
"alter table t add primary key(a) clustered;",
"ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table clustered_test(id int)",
"CREATE TABLE `clustered_test` (`id` INT)",
},
{
"create database clustered_test",
"CREATE DATABASE `clustered_test`",
},
{
"create database clustered",
"CREATE DATABASE `clustered`",
},
{
"create table clustered (id int)",
"CREATE TABLE `clustered` (`id` INT)",
},
{
"create table t1 (id int, a varchar(255) key clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)",
},
{
"alter table t force auto_increment = 12;",
"ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12",
},
{
"alter table t force, auto_increment = 12;",
"ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12",
},
{
"create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
"CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */",
},
}
for _, ca := range testCase {
re, err := addSpecialComment(ca.input)
c.Check(err, check.IsNil)
c.Check(re, check.Equals, ca.result)
}
c.Assert(func() {
_, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;")
}, check.Panics, "invalid ddlQuery statement size")
}
64 changes: 34 additions & 30 deletions tests/integration_tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

ddls=("create database ddl_reentrant" false
"create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false
"alter table ddl_reentrant.t1 add column b int" false
"alter table ddl_reentrant.t1 drop column b" false
"alter table ddl_reentrant.t1 add key index_a(a)" false
"alter table ddl_reentrant.t1 drop index index_a" false
"truncate table ddl_reentrant.t1" true
"alter table ddl_reentrant.t1 modify a varchar(20)" true
"rename table ddl_reentrant.t1 to ddl_reentrant.t2" false
"alter table ddl_reentrant.t2 alter a set default 'hello'" true
"alter table ddl_reentrant.t2 comment='modify comment'" true
"alter table ddl_reentrant.t2 rename index a to idx_a" false
"create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false
"alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false
"alter table ddl_reentrant.t3 drop partition p2" false
"alter table ddl_reentrant.t3 truncate partition p0" true
"create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false
"drop view ddl_reentrant.t3_view" false
"alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true
"alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true
# cdc parse and restore ddl with flags format.RestoreStringSingleQuotes|format.RestoreNameBackQuotes|format.RestoreKeyWordUppercase|format.RestoreTiDBSpecialComment
ddls=("create database ddl_reentrant" false 'CREATE DATABASE `ddl_reentrant`'
"create table ddl_reentrant.t1 (id int primary key, id2 int not null, a varchar(10) not null, unique a(a), unique id2(id2))" false 'CREATE TABLE `ddl_reentrant`.`t1` (`id` INT PRIMARY KEY,`id2` INT NOT NULL,`a` VARCHAR(10) NOT NULL,UNIQUE `a`(`a`),UNIQUE `id2`(`id2`))'
"alter table ddl_reentrant.t1 add column b int" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD COLUMN `b` INT'
"alter table ddl_reentrant.t1 drop column b" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP COLUMN `b`'
"alter table ddl_reentrant.t1 add key index_a(a)" false 'ALTER TABLE `ddl_reentrant`.`t1` ADD INDEX `index_a`(`a`)'
"alter table ddl_reentrant.t1 drop index index_a" false 'ALTER TABLE `ddl_reentrant`.`t1` DROP INDEX `index_a`'
"truncate table ddl_reentrant.t1" true 'TRUNCATE TABLE `ddl_reentrant`.`t1`'
"alter table ddl_reentrant.t1 modify a varchar(20)" true 'ALTER TABLE `ddl_reentrant`.`t1` MODIFY COLUMN `a` VARCHAR(20)'
"rename table ddl_reentrant.t1 to ddl_reentrant.t2" false 'RENAME TABLE `ddl_reentrant`.`t1` TO `ddl_reentrant`.`t2`'
"alter table ddl_reentrant.t2 alter a set default 'hello'" true 'ALTER TABLE `ddl_reentrant`.`t2` ALTER COLUMN `a` SET DEFAULT _UTF8MB4'"'hello'"
"alter table ddl_reentrant.t2 comment='modify comment'" true 'ALTER TABLE `ddl_reentrant`.`t2` COMMENT = '"'modify comment'"
"alter table ddl_reentrant.t2 rename index a to idx_a" false 'ALTER TABLE `ddl_reentrant`.`t2` RENAME INDEX `a` TO `idx_a`'
"create table ddl_reentrant.t3 (a int primary key, b int) partition by range(a) (partition p0 values less than (1000), partition p1 values less than (2000))" false 'CREATE TABLE `ddl_reentrant`.`t3` (`a` INT PRIMARY KEY,`b` INT) PARTITION BY RANGE (`a`) (PARTITION `p0` VALUES LESS THAN (1000),PARTITION `p1` VALUES LESS THAN (2000))'
"alter table ddl_reentrant.t3 add partition (partition p2 values less than (3000))" false 'ALTER TABLE `ddl_reentrant`.`t3` ADD PARTITION (PARTITION `p2` VALUES LESS THAN (3000))'
"alter table ddl_reentrant.t3 drop partition p2" false 'ALTER TABLE `ddl_reentrant`.`t3` DROP PARTITION `p2`'
"alter table ddl_reentrant.t3 truncate partition p0" true 'ALTER TABLE `ddl_reentrant`.`t3` TRUNCATE PARTITION `p0`'
"create view ddl_reentrant.t3_view as select a, b from ddl_reentrant.t3" false 'CREATE ALGORITHM = UNDEFINED DEFINER = CURRENT_USER SQL SECURITY DEFINER VIEW `ddl_reentrant`.`t3_view` AS SELECT `a`,`b` FROM `ddl_reentrant`.`t3`'
"drop view ddl_reentrant.t3_view" false 'DROP VIEW `ddl_reentrant`.`t3_view`'
"alter table ddl_reentrant.t3 default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER TABLE `ddl_reentrant`.`t3` CHARACTER SET UTF8MB4 COLLATE UTF8MB4_UNICODE_CI'
"alter schema ddl_reentrant default character set utf8mb4 default collate utf8mb4_unicode_ci" true 'ALTER DATABASE `ddl_reentrant` CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci'
)

function complete_ddls() {
Expand All @@ -36,14 +37,14 @@ function complete_ddls() {
echo "skip some DDLs in tidb v4.0.x"
else
# DDLs that are supportted since 5.0
ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false)
ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false)
ddls+=("alter table ddl_reentrant.t2 add column c1 int, add column c2 int, add column c3 int" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD COLUMN `c1` INT, ADD COLUMN `c2` INT, ADD COLUMN `c3` INT')
ddls+=("alter table ddl_reentrant.t2 drop column c1, drop column c2, drop column c3" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP COLUMN `c1`, DROP COLUMN `c2`, DROP COLUMN `c3`')
fi
ddls+=("alter table ddl_reentrant.t2 drop primary key" false)
ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false)
ddls+=("drop table ddl_reentrant.t2" false)
ddls+=("recover table ddl_reentrant.t2" false)
ddls+=("drop database ddl_reentrant" false)
ddls+=("alter table ddl_reentrant.t2 drop primary key" false 'ALTER TABLE `ddl_reentrant`.`t2` DROP PRIMARY KEY')
ddls+=("alter table ddl_reentrant.t2 add primary key pk(id)" false 'ALTER TABLE `ddl_reentrant`.`t2` ADD PRIMARY KEY `pk`(`id`)')
ddls+=("drop table ddl_reentrant.t2" false 'DROP TABLE `ddl_reentrant`.`t2`')
ddls+=("recover table ddl_reentrant.t2" false 'RECOVER TABLE `ddl_reentrant`.`t2`')
ddls+=("drop database ddl_reentrant" false 'DROP DATABASE `ddl_reentrant`')
}

changefeedid=""
Expand Down Expand Up @@ -94,14 +95,15 @@ tidb_build_branch=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} -e \
function ddl_test() {
ddl=$1
is_reentrant=$2
restored_sql=$3

echo "------------------------------------------"
echo "test ddl $ddl, is_reentrant: $is_reentrant"
echo "test ddl $ddl, is_reentrant: $is_reentrant restored_sql: $restored_sql"

run_sql $ddl ${UP_TIDB_HOST} ${UP_TIDB_PORT}
ensure 10 check_ts_forward $changefeedid

echo $ddl >${WORK_DIR}/ddl_temp.sql
echo $restored_sql >${WORK_DIR}/ddl_temp.sql
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true
ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log | tail -n 1 | grep -oE '"CommitTs\\":[0-9]{18}' | awk -F: '{print $(NF)}')
cdc cli changefeed remove --changefeed-id=${changefeedid}
Expand Down Expand Up @@ -146,7 +148,9 @@ function run() {
idx=$((idx + 1))
idxs_reentrant=${ddls[$idx]}
idx=$((idx + 1))
ddl_test $ddl $idxs_reentrant
restored_sql=${ddls[$idx]}
idx=$((idx + 1))
ddl_test $ddl $idxs_reentrant $restored_sql
done
IFS=$OLDIFS

Expand Down