diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index 1a772597b12..383b6e2f268 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -479,6 +479,7 @@ ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file." ErrWorkerFailConnectMaster,[code=40077:class=dm-worker:scope=internal:level=high], "Message: cannot join with master endpoints: %v, error: %v, Workaround: Please check network connection of worker and check worker name is unique." ErrWorkerRelayConfigChanging,[code=40079:class=dm-worker:scope=internal:level=low], "Message: relay config of worker %s is changed too frequently, last relay source %s:, new relay source %s, Workaround: Please try again later" +ErrWorkerRouteTableDupMatch,[code=40080:class=dm-worker:scope=internal:level=high], "Message: table %s.%s matches more than one rule, Workaround: please check the route rules in the task config" ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set" ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format." ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag" diff --git a/dm/checker/checker.go b/dm/checker/checker.go index dc688ca89ba..18315083ff8 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/dumpling" fr "github.com/pingcap/tiflow/dm/pkg/func-rollback" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools" @@ -43,7 +44,6 @@ import ( column "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/dumpling/export" "github.com/pingcap/tidb/parser/mysql" "go.uber.org/atomic" @@ -137,7 +137,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { if err != nil { return terror.ErrTaskCheckGenBAList.Delegate(err) } - r, err := router.NewTableRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules) + r, err := router.NewRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules) if err != nil { return terror.ErrTaskCheckGenTableRouter.Delegate(err) } diff --git a/dm/dm/config/subtask.go b/dm/dm/config/subtask.go index 3bfc2813a5d..b4a3d94d9a8 100644 --- a/dm/dm/config/subtask.go +++ b/dm/dm/config/subtask.go @@ -25,11 +25,11 @@ import ( bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/pkg/dumpling" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" ) @@ -426,7 +426,7 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { if _, err := filter.New(c.CaseSensitive, c.BAList); err != nil { return terror.ErrConfigGenBAList.Delegate(err) } - if _, err := router.NewTableRouter(c.CaseSensitive, c.RouteRules); err != nil { + if _, err := router.NewRouter(c.CaseSensitive, c.RouteRules); err != nil { return terror.ErrConfigGenTableRouter.Delegate(err) } // NewMapping will fill arguments with the default values. diff --git a/dm/errors.toml b/dm/errors.toml index c7cee3f0b32..252d383e171 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -2884,6 +2884,12 @@ description = "" workaround = "Please try again later" tags = ["internal", "low"] +[error.DM-dm-worker-40080] +message = "table %s.%s matches more than one rule" +description = "" +workaround = "please check the route rules in the task config" +tags = ["internal", "high"] + [error.DM-dm-tracer-42001] message = "parse dm-tracer config flag set" description = "" diff --git a/dm/loader/convert_data.go b/dm/loader/convert_data.go index 72e6bdd598b..65bb0a002a9 100644 --- a/dm/loader/convert_data.go +++ b/dm/loader/convert_data.go @@ -24,12 +24,12 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" parserpkg "github.com/pingcap/tiflow/dm/pkg/parser" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/errors" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" - router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/parser/ast" ) @@ -234,7 +234,7 @@ func tableName(schema, table string) string { return fmt.Sprintf("`%s`.`%s`", schema, table) } -func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file, sqlMode, sourceID string) (*tableInfo, error) { +func parseTable(ctx *tcontext.Context, r *router.RouteTable, schema, table, file, sqlMode, sourceID string) (*tableInfo, error) { statement, err := exportStatement(file) if err != nil { return nil, err diff --git a/dm/loader/convert_data_test.go b/dm/loader/convert_data_test.go index de29efa45ed..6a6977184a6 100644 --- a/dm/loader/convert_data_test.go +++ b/dm/loader/convert_data_test.go @@ -15,9 +15,9 @@ package loader import ( cm "github.com/pingcap/tidb-tools/pkg/column-mapping" - router "github.com/pingcap/tidb-tools/pkg/table-router" tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/router" . "github.com/pingcap/check" ) @@ -165,7 +165,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) { insertHeadStmt: "INSERT INTO `t` VALUES", } - r, err := router.NewTableRouter(false, rules) + r, err := router.NewRouter(false, rules) c.Assert(err, IsNil) tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES", "source-mysql-01") @@ -193,7 +193,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) { insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES", } - r, err := router.NewTableRouter(false, rules) + r, err := router.NewRouter(false, rules) c.Assert(err, IsNil) tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "", "source-mysql-01") @@ -411,7 +411,7 @@ func (t *testConvertDataSuite) TestParseTableWithExtendColumn(c *C) { extendVal: []string{"t2", "test1", "source1"}, } - r, err := router.NewTableRouter(false, rules) + r, err := router.NewRouter(false, rules) c.Assert(err, IsNil) tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES", "source1") @@ -456,7 +456,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumnExtendColumn(c * extendVal: []string{"t3", "test1", "source1"}, } - r, err := router.NewTableRouter(false, rules) + r, err := router.NewRouter(false, rules) c.Assert(err, IsNil) tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "", "source1") diff --git a/dm/loader/loader.go b/dm/loader/loader.go index 728e3352862..3ecc6492ce8 100644 --- a/dm/loader/loader.go +++ b/dm/loader/loader.go @@ -35,6 +35,7 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" fr "github.com/pingcap/tiflow/dm/pkg/func-rollback" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" @@ -42,7 +43,6 @@ import ( "github.com/pingcap/failpoint" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -430,7 +430,7 @@ type Loader struct { fileJobQueue chan *fileJob - tableRouter *router.Table + tableRouter *router.RouteTable baList *filter.Filter columnMapping *cm.Mapping @@ -876,7 +876,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error { var ( err error oldBaList *filter.Filter - oldTableRouter *router.Table + oldTableRouter *router.RouteTable oldColumnMapping *cm.Mapping ) @@ -904,7 +904,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error { // update route, for loader, this almost useless, because schemas often have been restored oldTableRouter = l.tableRouter - l.tableRouter, err = router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules) + l.tableRouter, err = router.NewRouter(cfg.CaseSensitive, cfg.RouteRules) if err != nil { return terror.ErrLoadUnitGenTableRouter.Delegate(err) } @@ -924,7 +924,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error { } func (l *Loader) genRouter(rules []*router.TableRule) error { - l.tableRouter, _ = router.NewTableRouter(l.cfg.CaseSensitive, []*router.TableRule{}) + l.tableRouter, _ = router.NewRouter(l.cfg.CaseSensitive, []*router.TableRule{}) for _, rule := range rules { err := l.tableRouter.AddRule(rule) if err != nil { @@ -1233,7 +1233,7 @@ func renameShardingSchema(query, srcSchema, dstSchema string, ansiquote bool) st return SQLReplace(query, srcSchema, dstSchema, ansiquote) } -func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, table string) (targetSchema string, targetTable string) { +func fetchMatchedLiteral(ctx *tcontext.Context, router *router.RouteTable, schema, table string) (targetSchema string, targetTable string) { if schema == "" { // nothing change return schema, table diff --git a/dm/pkg/router/router.go b/dm/pkg/router/router.go new file mode 100644 index 00000000000..10ea7f697b0 --- /dev/null +++ b/dm/pkg/router/router.go @@ -0,0 +1,247 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +import ( + "regexp" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb-tools/pkg/filter" + tablerouter "github.com/pingcap/tidb-tools/pkg/table-router" + + "github.com/pingcap/tiflow/dm/pkg/terror" +) + +type ( + TableRule = tablerouter.TableRule + Table = filter.Table + FilterRule = filter.Rules + TableExtractor = tablerouter.TableExtractor + SchemaExtractor = tablerouter.SchemaExtractor + SourceExtractor = tablerouter.SourceExtractor +) + +type FilterType = int32 + +const ( + TblFilter FilterType = iota + 1 + SchmFilter +) + +type filterWrapper struct { + filter *filter.Filter + typ FilterType + target Table + + rawRule *TableRule +} + +type RouteTable struct { + filters []*filterWrapper + caseSensitive bool +} + +func NewRouter(caseSensitive bool, rules []*TableRule) (*RouteTable, error) { + r := &RouteTable{ + filters: make([]*filterWrapper, 0), + caseSensitive: caseSensitive, + } + for _, rule := range rules { + if err := r.AddRule(rule); err != nil { + return nil, err + } + } + return r, nil +} + +func (r *RouteTable) AddRule(rule *TableRule) error { + err := rule.Valid() + if err != nil { + return errors.Trace(err) + } + if !r.caseSensitive { + rule.ToLower() + } + newFilter := &filterWrapper{ + rawRule: rule, + } + newFilter.target = Table{ + Schema: rule.TargetSchema, + Name: rule.TargetTable, + } + if len(rule.TablePattern) == 0 { + // raw schema rule + newFilter.typ = SchmFilter + rawFilter, err := filter.New(r.caseSensitive, &FilterRule{ + DoDBs: []string{rule.SchemaPattern}, + }) + if err != nil { + return errors.Annotatef(err, "add rule %+v into table router", rule) + } + newFilter.filter = rawFilter + } else { + newFilter.typ = TblFilter + rawFilter, err := filter.New(r.caseSensitive, &FilterRule{ + DoTables: []*Table{ + { + Schema: rule.SchemaPattern, + Name: rule.TablePattern, + }, + }, + DoDBs: []string{rule.SchemaPattern}, + }) + if err != nil { + return errors.Annotatef(err, "add rule %+v into table router", rule) + } + newFilter.filter = rawFilter + } + r.filters = append(r.filters, newFilter) + return nil +} + +func (r *RouteTable) Route(schema, table string) (string, string, error) { + curTable := &Table{ + Schema: schema, + Name: table, + } + tblRules := make([]*filterWrapper, 0) + schmRules := make([]*filterWrapper, 0) + for _, filterWrapper := range r.filters { + if filterWrapper.filter.Match(curTable) { + if filterWrapper.typ == TblFilter { + tblRules = append(tblRules, filterWrapper) + } else { + schmRules = append(schmRules, filterWrapper) + } + } + } + var ( + targetSchema string + targetTable string + ) + if table == "" || len(tblRules) == 0 { + // 1. no need to match table or + // 2. match no table + if len(schmRules) > 1 { + return "", "", terror.ErrWorkerRouteTableDupMatch.Generate(schema, table) + } + if len(schmRules) == 1 { + targetSchema, targetTable = schmRules[0].target.Schema, schmRules[0].target.Name + } + } else { + if len(tblRules) > 1 { + return "", "", terror.ErrWorkerRouteTableDupMatch.Generate(schema, table) + } + targetSchema, targetTable = tblRules[0].target.Schema, tblRules[0].target.Name + } + if len(targetSchema) == 0 { + targetSchema = schema + } + if len(targetTable) == 0 { + targetTable = table + } + return targetSchema, targetTable, nil +} + +func (r *RouteTable) AllRules() ([]TableRule, []TableRule) { + var ( + schmRouteRules []TableRule + tableRouteRules []TableRule + ) + for _, filter := range r.filters { + if filter.typ == SchmFilter { + schmRouteRules = append(schmRouteRules, *filter.rawRule) + } else { + tableRouteRules = append(tableRouteRules, *filter.rawRule) + } + } + return schmRouteRules, tableRouteRules +} + +func (r *RouteTable) FetchExtendColumn(schema, table, source string) ([]string, []string) { + var cols []string + var vals []string + rules := []*filterWrapper{} + curTable := &Table{ + Schema: schema, + Name: table, + } + for _, filter := range r.filters { + if filter.filter.Match(curTable) { + rules = append(rules, filter) + } + } + var ( + schemaRules = make([]*TableRule, 0, len(rules)) + tableRules = make([]*TableRule, 0, len(rules)) + ) + for i := range rules { + rule := rules[i].rawRule + if rule.TablePattern == "" { + schemaRules = append(schemaRules, rule) + } else { + tableRules = append(tableRules, rule) + } + } + if len(tableRules) == 0 && len(schemaRules) == 0 { + return cols, vals + } + var rule *TableRule + if len(tableRules) == 0 { + rule = schemaRules[0] + } else { + rule = tableRules[0] + } + if rule.TableExtractor != nil { + cols = append(cols, rule.TableExtractor.TargetColumn) + vals = append(vals, extractVal(table, rule.TableExtractor)) + } + + if rule.SchemaExtractor != nil { + cols = append(cols, rule.SchemaExtractor.TargetColumn) + vals = append(vals, extractVal(schema, rule.SchemaExtractor)) + } + + if rule.SourceExtractor != nil { + cols = append(cols, rule.SourceExtractor.TargetColumn) + vals = append(vals, extractVal(source, rule.SourceExtractor)) + } + return cols, vals +} + +func extractVal(s string, ext interface{}) string { + var params []string + switch e := ext.(type) { + case *tablerouter.TableExtractor: + if regExpr, err := regexp.Compile(e.TableRegexp); err == nil { + params = regExpr.FindStringSubmatch(s) + } + case *tablerouter.SchemaExtractor: + if regExpr, err := regexp.Compile(e.SchemaRegexp); err == nil { + params = regExpr.FindStringSubmatch(s) + } + case *tablerouter.SourceExtractor: + if regExpr, err := regexp.Compile(e.SourceRegexp); err == nil { + params = regExpr.FindStringSubmatch(s) + } + } + var val strings.Builder + for idx, param := range params { + if idx > 0 { + val.WriteString(param) + } + } + return val.String() +} diff --git a/dm/pkg/router/router_test.go b/dm/pkg/router/router_test.go new file mode 100644 index 00000000000..f0b3ce9d4de --- /dev/null +++ b/dm/pkg/router/router_test.go @@ -0,0 +1,387 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +import ( + "fmt" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/tiflow/dm/pkg/terror" + + oldrouter "github.com/pingcap/tidb-tools/pkg/table-router" +) + +var _ = Suite(&testRouteSuite{}) + +type testRouteSuite struct{} + +func TestRoute(t *testing.T) { + TestingT(t) +} + +func (s *testRouteSuite) TestCreateRouter(c *C) { + _, err := NewRouter(true, []*TableRule{}) + c.Assert(err, Equals, nil) + _, err = NewRouter(false, []*TableRule{}) + c.Assert(err, Equals, nil) +} + +func (s *testRouteSuite) TestAddRule(c *C) { + r, err := NewRouter(true, []*TableRule{}) + c.Assert(err, Equals, nil) + rules := []*TableRule{ + { + SchemaPattern: "test1", + TargetSchema: "dtest1", + }, + { + SchemaPattern: "test2", + TablePattern: "table2", + TargetSchema: "dtest2", + TargetTable: "dtable2", + }, + } + for _, rule := range rules { + err = r.AddRule(rule) + c.Assert(err, Equals, nil) // successfully insert + } + r, err = NewRouter(false, []*TableRule{}) + c.Assert(err, Equals, nil) + for _, rule := range rules { + err := r.AddRule(rule) + c.Assert(err, Equals, nil) // successfully insert + } +} + +func (s *testRouteSuite) TestSchemaRoute(c *C) { + rules := []*TableRule{ + { + SchemaPattern: "test1", + TargetSchema: "dtest1", + }, + { + SchemaPattern: "gtest*", + TargetSchema: "dtest", + }, + } + oldRouter, err := oldrouter.NewTableRouter(true, rules) + c.Assert(err, Equals, nil) + newRouter, err := NewRouter(true, rules) + c.Assert(err, Equals, nil) + inputTables := []Table{ + { + Schema: "test1", // match rule 1 + Name: "table1", + }, + { + Schema: "gtesttest", // match rule 2 + Name: "atable", + }, + { + Schema: "ptest", // match neither + Name: "atableg", + }, + } + expectedResult := []Table{ + { + Schema: "dtest1", + Name: "table1", + }, + { + Schema: "dtest", + Name: "atable", + }, + { + Schema: "ptest", + Name: "atableg", + }, + } + for idx := range inputTables { + schema, table := inputTables[idx].Schema, inputTables[idx].Name + expSchema, expTable := expectedResult[idx].Schema, expectedResult[idx].Name + oldSchema, oldTable, err := oldRouter.Route(schema, table) + c.Assert(err, Equals, nil) + newSchema, newTable, err := newRouter.Route(schema, table) + c.Assert(err, Equals, nil) + c.Assert(oldSchema, Equals, expSchema) + c.Assert(oldTable, Equals, expTable) + c.Assert(newSchema, Equals, expSchema) + c.Assert(newTable, Equals, expTable) + } +} + +func (s *testRouteSuite) TestTableRoute(c *C) { + rules := []*TableRule{ + { + SchemaPattern: "test1", + TablePattern: "table1", + TargetSchema: "dtest1", + TargetTable: "dtable1", + }, + { + SchemaPattern: "test*", + TablePattern: "table2", + TargetSchema: "dtest2", + TargetTable: "dtable2", + }, + { + SchemaPattern: "test3", + TablePattern: "table*", + TargetSchema: "dtest3", + TargetTable: "dtable3", + }, + } + inputTables := []*Table{} + expTables := []*Table{} + for i := 1; i <= 3; i++ { + inputTables = append(inputTables, &Table{ + Schema: fmt.Sprintf("test%d", i), + Name: fmt.Sprintf("table%d", i), + }) + expTables = append(expTables, &Table{ + Schema: fmt.Sprintf("dtest%d", i), + Name: fmt.Sprintf("dtable%d", i), + }) + } + oldRouter, err := oldrouter.NewTableRouter(true, rules) + c.Assert(err, Equals, nil) + newRouter, err := NewRouter(true, rules) + c.Assert(err, Equals, nil) + for i := range inputTables { + schema, table := inputTables[i].Schema, inputTables[i].Name + expSchema, expTable := expTables[i].Schema, expTables[i].Name + oldSch, oldTbl, _ := oldRouter.Route(schema, table) + newSch, newTbl, _ := newRouter.Route(schema, table) + c.Assert(newSch, Equals, expSchema) + c.Assert(newTbl, Equals, expTable) + c.Assert(oldSch, Equals, expSchema) + c.Assert(oldTbl, Equals, expTable) + } +} + +func (s *testRouteSuite) TestRegExprRoute(c *C) { + rules := []*TableRule{ + { + SchemaPattern: "~test.[0-9]+", + TargetSchema: "dtest1", + }, + { + SchemaPattern: "~test2?[animal|human]", + TablePattern: "~tbl.*[cat|dog]+", + TargetSchema: "dtest2", + TargetTable: "dtable2", + }, + { + SchemaPattern: "~test3_(schema)?.*", + TablePattern: "test3_*", + TargetSchema: "dtest3", + TargetTable: "dtable3", + }, + { + SchemaPattern: "test4s_*", + TablePattern: "~testtable_[donot_delete]?", + TargetSchema: "dtest4", + TargetTable: "dtable4", + }, + } + inputTable := []Table{ + { + Schema: "tests100", + Name: "table1", // match rule 1 + }, + { + Schema: "test2animal", + Name: "tbl_animal_dogcat", // match rule 2 + }, + { + Schema: "test3_schema_meta", + Name: "test3_tail", // match rule 3 + }, + { + Schema: "test4s_2022", + Name: "testtable_donot_delete", // match rule 4 + }, + { + Schema: "mytst5566", + Name: "gtable", // match nothing + }, + } + expectedOutput := []Table{ + { + Schema: "dtest1", + Name: "table1", + }, + { + Schema: "dtest2", + Name: "dtable2", + }, + { + Schema: "dtest3", + Name: "dtable3", + }, + { + Schema: "dtest4", + Name: "dtable4", + }, + { + Schema: "mytst5566", + Name: "gtable", + }, + } + newRouter, err := NewRouter(true, rules) + c.Assert(err, Equals, nil) + for idx := range inputTable { + s, n := inputTable[idx].Schema, inputTable[idx].Name + expSchm, expName := expectedOutput[idx].Schema, expectedOutput[idx].Name + newSchm, newName, err := newRouter.Route(s, n) + c.Assert(err, Equals, nil) + c.Assert(newSchm, Equals, expSchm) + c.Assert(newName, Equals, expName) + } +} + +func (s *testRouteSuite) TestFetchExtendColumn(c *C) { + rules := []*TableRule{ + { + SchemaPattern: "schema*", + TablePattern: "t*", + TargetSchema: "test", + TargetTable: "t", + TableExtractor: &TableExtractor{ + TargetColumn: "table_name", + TableRegexp: "table_(.*)", + }, + SchemaExtractor: &SchemaExtractor{ + TargetColumn: "schema_name", + SchemaRegexp: "schema_(.*)", + }, + SourceExtractor: &SourceExtractor{ + TargetColumn: "source_name", + SourceRegexp: "source_(.*)_(.*)", + }, + }, + { + SchemaPattern: "~s?chema.*", + TargetSchema: "test", + TargetTable: "t2", + SchemaExtractor: &SchemaExtractor{ + TargetColumn: "schema_name", + SchemaRegexp: "(.*)", + }, + SourceExtractor: &SourceExtractor{ + TargetColumn: "source_name", + SourceRegexp: "(.*)", + }, + }, + } + r, err := NewRouter(false, rules) + c.Assert(err, IsNil) + expected := [][]string{ + {"table_name", "schema_name", "source_name"}, + {"t1", "s1", "s1s1"}, + + {"schema_name", "source_name"}, + {"schema_s2", "source_s2"}, + } + + // table level rules have highest priority + extendCol, extendVal := r.FetchExtendColumn("schema_s1", "table_t1", "source_s1_s1") + c.Assert(expected[0], DeepEquals, extendCol) + c.Assert(expected[1], DeepEquals, extendVal) + + // only schema rules + extendCol2, extendVal2 := r.FetchExtendColumn("schema_s2", "a_table_t2", "source_s2") + c.Assert(expected[2], DeepEquals, extendCol2) + c.Assert(expected[3], DeepEquals, extendVal2) +} + +func (s *testRouteSuite) TestAllRule(c *C) { + rules := []*TableRule{ + { + SchemaPattern: "~test.[0-9]+", + TargetSchema: "dtest1", + }, + { + SchemaPattern: "~test2?[animal|human]", + TablePattern: "~tbl.*[cat|dog]+", + TargetSchema: "dtest2", + TargetTable: "dtable2", + }, + { + SchemaPattern: "~test3_(schema)?.*", + TablePattern: "test3_*", + TargetSchema: "dtest3", + TargetTable: "dtable3", + }, + { + SchemaPattern: "test4s_*", + TablePattern: "~testtable_[donot_delete]?", + TargetSchema: "dtest4", + TargetTable: "dtable4", + }, + } + r, err := NewRouter(true, rules) + c.Assert(err, Equals, nil) + schemaRules, tableRules := r.AllRules() + c.Assert(len(schemaRules), Equals, 1) + c.Assert(len(tableRules), Equals, 3) + c.Assert(schemaRules[0].SchemaPattern, Equals, rules[0].SchemaPattern) + for i := 0; i < 3; i++ { + c.Assert(tableRules[i].SchemaPattern, Equals, rules[i+1].SchemaPattern) + c.Assert(tableRules[i].TablePattern, Equals, rules[i+1].TablePattern) + } +} + +func (s *testRouteSuite) TestDupMatch(c *C) { + rules := []*TableRule{ + { + SchemaPattern: "~test[0-9]+.*", + TablePattern: "~.*", + TargetSchema: "dtest1", + }, + { + SchemaPattern: "~test2?[a|b]", + TablePattern: "~tbl2", + TargetSchema: "dtest2", + TargetTable: "dtable2", + }, + { + SchemaPattern: "mytest*", + TargetSchema: "mytest", + }, + { + SchemaPattern: "~mytest(_meta)?_schema", + TargetSchema: "test", + }, + } + inputTables := []Table{ + { + Schema: "test2a", // match rule1 and rule2 + Name: "tbl2", + }, + { + Schema: "mytest_meta_schema", // match rule3 and rule4 + Name: "", + }, + } + r, err := NewRouter(true, rules) + c.Assert(err, Equals, nil) + for i := range inputTables { + targetSchm, targetTbl, err := r.Route(inputTables[i].Schema, inputTables[i].Name) + c.Assert(targetSchm, Equals, "") + c.Assert(targetTbl, Equals, "") + c.Assert(terror.ErrWorkerRouteTableDupMatch.Equal(err), Equals, true) + } +} diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 7e8cdf03058..e86fb47ba00 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -584,6 +584,7 @@ const ( codeWorkerFailConnectMaster codeWorkerWaitRelayCatchupGTID codeWorkerRelayConfigChanging + codeWorkerRouteTableDupMatch ) // DM-tracer error code. @@ -1235,7 +1236,7 @@ var ( ErrWorkerTLSConfigNotValid = New(codeWorkerTLSConfigNotValid, ClassDMWorker, ScopeInternal, LevelHigh, "TLS config not valid", "Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file.") ErrWorkerFailConnectMaster = New(codeWorkerFailConnectMaster, ClassDMWorker, ScopeInternal, LevelHigh, "cannot join with master endpoints: %v, error: %v", "Please check network connection of worker and check worker name is unique.") ErrWorkerRelayConfigChanging = New(codeWorkerRelayConfigChanging, ClassDMWorker, ScopeInternal, LevelLow, "relay config of worker %s is changed too frequently, last relay source %s:, new relay source %s", "Please try again later") - + ErrWorkerRouteTableDupMatch = New(codeWorkerRouteTableDupMatch, ClassDMWorker, ScopeInternal, LevelHigh, "table %s.%s matches more than one rule", "please check the route rules in the task config") // DM-tracer error. ErrTracerParseFlagSet = New(codeTracerParseFlagSet, ClassDMTracer, ScopeInternal, LevelMedium, "parse dm-tracer config flag set", "") ErrTracerConfigTomlTransform = New(codeTracerConfigTomlTransform, ClassDMTracer, ScopeInternal, LevelMedium, "config toml transform", "Please check the configuration file has correct TOML format.") diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 25c4ecb348a..e873374a0ed 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/parser/model" tmysql "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" @@ -34,6 +33,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -123,7 +123,7 @@ func FetchAllDoTables(ctx context.Context, db *sql.DB, bw *filter.Filter) (map[s } // FetchTargetDoTables returns all need to do tables after filtered and routed (fetches from upstream MySQL). -func FetchTargetDoTables(ctx context.Context, db *sql.DB, bw *filter.Filter, router *router.Table) (map[string][]*filter.Table, error) { +func FetchTargetDoTables(ctx context.Context, db *sql.DB, bw *filter.Filter, router *router.RouteTable) (map[string][]*filter.Table, error) { // fetch tables from source and filter them sourceTables, err := FetchAllDoTables(ctx, db, bw) diff --git a/dm/pkg/utils/common_test.go b/dm/pkg/utils/common_test.go index 71787336517..c678deadcc0 100644 --- a/dm/pkg/utils/common_test.go +++ b/dm/pkg/utils/common_test.go @@ -21,8 +21,8 @@ import ( "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tiflow/dm/pkg/router" ) var _ = Suite(&testCommonSuite{}) @@ -151,7 +151,7 @@ func (s *testCommonSuite) TestFetchTargetDoTables(c *C) { // empty filter and router, just as upstream. ba, err := filter.New(false, nil) c.Assert(err, IsNil) - r, err := router.NewTableRouter(false, nil) + r, err := router.NewRouter(false, nil) c.Assert(err, IsNil) schemas := []string{"shard1"} @@ -178,7 +178,7 @@ func (s *testCommonSuite) TestFetchTargetDoTables(c *C) { c.Assert(mock.ExpectationsWereMet(), IsNil) // route to the same downstream. - r, err = router.NewTableRouter(false, []*router.TableRule{ + r, err = router.NewRouter(false, []*router.TableRule{ {SchemaPattern: "shard*", TablePattern: "tbl*", TargetSchema: "shard", TargetTable: "tbl"}, }) c.Assert(err, IsNil) diff --git a/dm/syncer/ddl_test.go b/dm/syncer/ddl_test.go index 3b32ddc0f21..365cd0009b9 100644 --- a/dm/syncer/ddl_test.go +++ b/dm/syncer/ddl_test.go @@ -22,7 +22,6 @@ import ( "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "go.uber.org/zap" @@ -32,6 +31,7 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" parserpkg "github.com/pingcap/tiflow/dm/pkg/parser" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools" @@ -220,7 +220,7 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) { syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) - syncer.tableRouter, err = router.NewTableRouter(false, []*router.TableRule{ + syncer.tableRouter, err = router.NewRouter(false, []*router.TableRule{ { SchemaPattern: "s1", TargetSchema: "xs1", diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 96ebb22d793..41291504414 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -34,7 +34,6 @@ import ( cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" - router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" @@ -57,6 +56,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/ha" "github.com/pingcap/tiflow/dm/pkg/log" parserpkg "github.com/pingcap/tiflow/dm/pkg/parser" + "github.com/pingcap/tiflow/dm/pkg/router" "github.com/pingcap/tiflow/dm/pkg/schema" "github.com/pingcap/tiflow/dm/pkg/shardddl/pessimism" "github.com/pingcap/tiflow/dm/pkg/streamer" @@ -157,7 +157,7 @@ type Syncer struct { isTransactionEnd bool waitTransactionLock sync.Mutex - tableRouter *router.Table + tableRouter *router.RouteTable binlogFilter *bf.BinlogEvent columnMapping *cm.Mapping baList *filter.Filter @@ -2435,7 +2435,7 @@ func (qec *queryEventContext) String() string { } // generateExtendColumn generate extended columns by extractor. -func generateExtendColumn(data [][]interface{}, r *router.Table, table *filter.Table, sourceID string) [][]interface{} { +func generateExtendColumn(data [][]interface{}, r *router.RouteTable, table *filter.Table, sourceID string) [][]interface{} { extendCol, extendVal := r.FetchExtendColumn(table.Schema, table.Name, sourceID) if len(extendCol) == 0 { return nil @@ -3168,7 +3168,7 @@ func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (ma } func (s *Syncer) genRouter() error { - s.tableRouter, _ = router.NewTableRouter(s.cfg.CaseSensitive, []*router.TableRule{}) + s.tableRouter, _ = router.NewRouter(s.cfg.CaseSensitive, []*router.TableRule{}) for _, rule := range s.cfg.RouteRules { err := s.tableRouter.AddRule(rule) if err != nil { @@ -3480,7 +3480,7 @@ func (s *Syncer) Update(ctx context.Context, cfg *config.SubTaskConfig) error { var ( err error oldBaList *filter.Filter - oldTableRouter *router.Table + oldTableRouter *router.RouteTable oldBinlogFilter *bf.BinlogEvent oldColumnMapping *cm.Mapping ) @@ -3512,7 +3512,7 @@ func (s *Syncer) Update(ctx context.Context, cfg *config.SubTaskConfig) error { // update route oldTableRouter = s.tableRouter - s.tableRouter, err = router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules) + s.tableRouter, err = router.NewRouter(cfg.CaseSensitive, cfg.RouteRules) if err != nil { return terror.ErrSyncerUnitGenTableRouter.Delegate(err) } diff --git a/dm/tests/all_mode/conf/regexpr-task.yaml b/dm/tests/all_mode/conf/regexpr-task.yaml new file mode 100644 index 00000000000..557129beb40 --- /dev/null +++ b/dm/tests/all_mode/conf/regexpr-task.yaml @@ -0,0 +1,47 @@ +--- +name: regexprtest +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +timezone: "+04:00" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + session: + tidb_skip_utf8_check: 1 + tidb_disable_txn_auto_retry: off + tidb_retry_limit: "10" + +mysql-instances: + - source-id: "mysql-replica-01" + route-rules: ["rule1"] + block-allow-list: "balist1" + loader: + import-mode: "loader" + - source-id: "mysql-replica-02" + route-rules: ["rule2"] + block-allow-list: "balist1" + loader: + import-mode: "loader" + +routes: + rule1: + schema-pattern: "~test2?[animal|human]" + table-pattern: "~tbl.*[cat|dog]+" + target-schema: "dtest2" + target-table: "dtable2" + rule2: + schema-pattern: "test4s_*" + table-pattern: "~testtable_[donot_delete]?" + target-schema: "dtest4" + target-table: "dtable4" + +block-allow-list: + balist1: + do-dbs: ["~test2?[animal|human]", "test4s_*"] diff --git a/dm/tests/all_mode/conf/regexpr_diff_config.toml b/dm/tests/all_mode/conf/regexpr_diff_config.toml new file mode 100644 index 00000000000..88635ba8680 --- /dev/null +++ b/dm/tests/all_mode/conf/regexpr_diff_config.toml @@ -0,0 +1,49 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["mysql1", "mysql2"] + + target-instance = "tidb0" + + target-check-tables = ["dtest2.dtable2", "dtest4.dtable4"] + + +[routes.rule1] + schema-pattern = "test2animal" + table-pattern = "tbl_animal_dogcat" + target-schema = "dtest2" + target-table = "dtable2" +[routes.rule2] + schema-pattern = "test4s_*" + table-pattern = "testtable_donot_delete" + target-schema = "dtest4" + target-table = "dtable4" + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +route-rules = ["rule1"] + +[data-sources.mysql2] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" +route-rules = ["rule2"] + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" \ No newline at end of file diff --git a/dm/tests/all_mode/data/db1.regexpr.sql b/dm/tests/all_mode/data/db1.regexpr.sql new file mode 100644 index 00000000000..ff0c641a604 --- /dev/null +++ b/dm/tests/all_mode/data/db1.regexpr.sql @@ -0,0 +1,13 @@ +-- test regular expression router +drop database if exists `test2animal`; +create database `test2animal`; +use `test2animal`; +create table tbl_animal_dogcat ( + a int, + b int, + primary key(a) +); +insert into tbl_animal_dogcat values +(1, 1), +(2, 2), +(3, 3); \ No newline at end of file diff --git a/dm/tests/all_mode/data/db2.regexpr.sql b/dm/tests/all_mode/data/db2.regexpr.sql new file mode 100644 index 00000000000..789c9b021ac --- /dev/null +++ b/dm/tests/all_mode/data/db2.regexpr.sql @@ -0,0 +1,14 @@ +-- test regular expression router +drop database if exists `test4s_2022`; +create database `test4s_2022`; +use `test4s_2022`; +create table testtable_donot_delete ( + a int, + b int, + primary key(a) +); +insert into testtable_donot_delete values +(1, 1), +(2, 2), +(3, 3), +(4, 4); \ No newline at end of file diff --git a/dm/tests/all_mode/run.sh b/dm/tests/all_mode/run.sh index d933b763d1b..3d43bc83262 100755 --- a/dm/tests/all_mode/run.sh +++ b/dm/tests/all_mode/run.sh @@ -322,6 +322,61 @@ function test_expression_filter() { echo "[$(date)] <<<<<< finish test_expression_filter >>>>>>" } +function test_regexpr_router() { + echo "[$(date)] <<<<<< start test_regexpr_router >>>>>>" + cleanup_process + cleanup_data all_mode + cleanup_data test2animal + cleanup_data test4s_2022 + cleanup_data_upstream test2animal + cleanup_data_upstream test4s_2022 + run_sql_file $cur/data/db1.regexpr.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.regexpr.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_tidb 'drop database if exists dtest2;' + run_sql_tidb 'create database dtest2;' + run_sql_tidb 'drop database if exists dtest4;' + run_sql_tidb 'create database dtest4;' + run_sql_tidb 'create table if not exists dtest2.dtable2(a int, b int);' + run_sql_tidb 'create table if not exists dtest4.dtable4(a int, b int);' + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + cp $cur/conf/regexpr-task.yaml $WORK_DIR/regexpr-task.yaml + sed -i "s/name: test/name: $ILLEGAL_CHAR_NAME/g" $WORK_DIR/regexpr-task.yaml + + # error config + # there should be a error message like "Incorrect argument type to variable 'tidb_retry_limit'" + # but different TiDB version output different message. so we only roughly match here + sed -i 's/tidb_retry_limit: "10"/tidb_retry_limit: "fjs"/g' $WORK_DIR/regexpr-task.yaml + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/regexpr-task.yaml --remove-meta" \ + "tidb_retry_limit" 1 + + sed -i 's/tidb_retry_limit: "fjs"/tidb_retry_limit: "10"/g' $WORK_DIR/regexpr-task.yaml + dmctl_start_task "$WORK_DIR/regexpr-task.yaml" "--remove-meta" + + check_sync_diff $WORK_DIR $cur/conf/regexpr_diff_config.toml + + cleanup_process + cleanup_data test2animal + cleanup_data test4s_2022 + cleanup_data_upstream test2animal + cleanup_data_upstream test4s_2022 + echo "[$(date)] <<<<<< finish test_regexpr_router >>>>>>" +} + function run() { run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" run_sql_source1 "SET @@global.time_zone = '+01:00';" @@ -331,6 +386,7 @@ function run() { test_session_config test_query_timeout test_stop_task_before_checkpoint + test_regexpr_router inject_points=( "github.com/pingcap/tiflow/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"