Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc/owner: Fix ddl special comment syntax error (#3845) #4415

Closed
36 changes: 34 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package owner

import (
"context"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
Comment on lines +25 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use pingcap/parser?

"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand All @@ -30,6 +32,8 @@ import (
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

_ "github.com/pingcap/tidb/parser/test_driver"
)

type changefeed struct {
Expand Down Expand Up @@ -431,7 +435,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
if err != nil {
return false, errors.Trace(err)
}
ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query)
ddlEvent.Query, err = addSpecialComment(ddlEvent.Query)
if err != nil {
return false, errors.Trace(err)
}

c.ddlEventCache = ddlEvent
}
if job.BinlogInfo.TableInfo != nil && c.schema.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID) {
Expand Down Expand Up @@ -492,3 +500,27 @@ func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
func (c *changefeed) Close() {
c.releaseResources()
}

// addSpecialComment translate tidb feature to comment
func addSpecialComment(ddlQuery string) (string, error) {
stms, _, err := parser.New().ParseSQL(ddlQuery)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery))
}
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}
145 changes: 143 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1")
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE `test1`")

// executing the ddl finished
mockAsyncSink.ddlDone = true
Expand All @@ -286,7 +286,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)")
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "CREATE TABLE `test1`.`test1` (`id` INT PRIMARY KEY)")

// executing the ddl finished
mockAsyncSink.ddlDone = true
Expand Down Expand Up @@ -353,3 +353,144 @@ func (s *changefeedSuite) TestFinished(c *check.C) {
c.Assert(state.Status.CheckpointTs, check.Equals, state.Info.TargetTs)
c.Assert(state.Info.State, check.Equals, model.StateFinished)
}

func (s *changefeedSuite) TestAddSpecialComment(c *check.C) {
defer testleak.AfterTest(c)()
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int primary key auto_random(2));",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)",
},
{
"create table t1 (id int primary key auto_random);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */",
},
{
"create table t1 (id int auto_random primary key) auto_random_base = 50;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */",
},
{
"create table t1 (id int auto_increment key) auto_id_cache 100;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */",
},
{
"create table t1 (id int auto_increment unique) auto_id_cache 10;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */",
},
{
"create table t1 (id int) auto_id_cache = 5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) auto_id_cache=5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int, v int, primary key(a) clustered);",
"CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int primary key clustered, v int);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT)",
},
{
"alter table t add primary key(a) clustered;",
"ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table clustered_test(id int)",
"CREATE TABLE `clustered_test` (`id` INT)",
},
{
"create database clustered_test",
"CREATE DATABASE `clustered_test`",
},
{
"create database clustered",
"CREATE DATABASE `clustered`",
},
{
"create table clustered (id int)",
"CREATE TABLE `clustered` (`id` INT)",
},
{
"create table t1 (id int, a varchar(255) key clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)",
},
{
"alter table t force auto_increment = 12;",
"ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12",
},
{
"alter table t force, auto_increment = 12;",
"ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12",
},
{
"create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
"CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */",
},
}
for _, ca := range testCase {
re, err := addSpecialComment(ca.input)
c.Check(err, check.IsNil)
c.Check(re, check.Equals, ca.result)
}
c.Assert(func() {
_, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;")
}, check.Panics, "invalid ddlQuery statement size")
}
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17
github.com/fatih/color v1.10.0
github.com/frankban/quicktest v1.11.1 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/go-sql-driver/mysql v1.6.0
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.3.4
github.com/golang/snappy v0.0.2 // indirect
Expand All @@ -39,25 +39,26 @@ require (
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pingcap/br v5.0.0-nightly.0.20210419090151-03762465b589+incompatible
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7
github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde
github.com/pingcap/tidb v1.1.0-beta.0.20210508083641-8ed1d9d4a798
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tidb/parser v0.0.0-20220119134945-60d7c78c7dfc
github.com/prometheus/client_golang v1.5.1
github.com/r3labs/diff v1.1.0
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/tikv/pd v1.1.0-beta.0.20210323121136-78679e5e209d
github.com/tinylib/msgp v1.1.0
github.com/uber-go/atomic v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/bbolt v1.3.4 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b
go.uber.org/zap v1.16.0
go.uber.org/zap v1.18.1
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
Expand Down
Loading