Skip to content

Commit

Permalink
dm: new router compatible with regular expression (pingcap#4358)
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou authored and zhaoxinyu committed Feb 16, 2022
1 parent 653af8e commit 2045a3a
Show file tree
Hide file tree
Showing 19 changed files with 852 additions and 31 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high
ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file."
ErrWorkerFailConnectMaster,[code=40077:class=dm-worker:scope=internal:level=high], "Message: cannot join with master endpoints: %v, error: %v, Workaround: Please check network connection of worker and check worker name is unique."
ErrWorkerRelayConfigChanging,[code=40079:class=dm-worker:scope=internal:level=low], "Message: relay config of worker %s is changed too frequently, last relay source %s:, new relay source %s, Workaround: Please try again later"
ErrWorkerRouteTableDupMatch,[code=40080:class=dm-worker:scope=internal:level=high], "Message: table %s.%s matches more than one rule, Workaround: please check the route rules in the task config"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format."
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag"
Expand Down
4 changes: 2 additions & 2 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/dumpling"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
Expand All @@ -43,7 +44,6 @@ import (
column "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/atomic"
Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if err != nil {
return terror.ErrTaskCheckGenBAList.Delegate(err)
}
r, err := router.NewTableRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
r, err := router.NewRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
if err != nil {
return terror.ErrTaskCheckGenTableRouter.Delegate(err)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/dumpling"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
)
Expand Down Expand Up @@ -426,7 +426,7 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if _, err := filter.New(c.CaseSensitive, c.BAList); err != nil {
return terror.ErrConfigGenBAList.Delegate(err)
}
if _, err := router.NewTableRouter(c.CaseSensitive, c.RouteRules); err != nil {
if _, err := router.NewRouter(c.CaseSensitive, c.RouteRules); err != nil {
return terror.ErrConfigGenTableRouter.Delegate(err)
}
// NewMapping will fill arguments with the default values.
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2884,6 +2884,12 @@ description = ""
workaround = "Please try again later"
tags = ["internal", "low"]

[error.DM-dm-worker-40080]
message = "table %s.%s matches more than one rule"
description = ""
workaround = "please check the route rules in the task config"
tags = ["internal", "high"]

[error.DM-dm-tracer-42001]
message = "parse dm-tracer config flag set"
description = ""
Expand Down
4 changes: 2 additions & 2 deletions dm/loader/convert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
parserpkg "github.com/pingcap/tiflow/dm/pkg/parser"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"

"github.com/pingcap/errors"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/parser/ast"
)

Expand Down Expand Up @@ -234,7 +234,7 @@ func tableName(schema, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}

func parseTable(ctx *tcontext.Context, r *router.Table, schema, table, file, sqlMode, sourceID string) (*tableInfo, error) {
func parseTable(ctx *tcontext.Context, r *router.RouteTable, schema, table, file, sqlMode, sourceID string) (*tableInfo, error) {
statement, err := exportStatement(file)
if err != nil {
return nil, err
Expand Down
10 changes: 5 additions & 5 deletions dm/loader/convert_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ package loader

import (
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
router "github.com/pingcap/tidb-tools/pkg/table-router"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/router"

. "github.com/pingcap/check"
)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (t *testConvertDataSuite) TestParseTable(c *C) {
insertHeadStmt: "INSERT INTO `t` VALUES",
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES", "source-mysql-01")
Expand Down Expand Up @@ -193,7 +193,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumn(c *C) {
insertHeadStmt: "INSERT INTO `t` (`id`,`t_json`) VALUES",
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "", "source-mysql-01")
Expand Down Expand Up @@ -411,7 +411,7 @@ func (t *testConvertDataSuite) TestParseTableWithExtendColumn(c *C) {
extendVal: []string{"t2", "test1", "source1"},
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t2", "./dumpfile/test1.t2-schema.sql", "ANSI_QUOTES", "source1")
Expand Down Expand Up @@ -456,7 +456,7 @@ func (t *testConvertDataSuite) TestParseTableWithGeneratedColumnExtendColumn(c *
extendVal: []string{"t3", "test1", "source1"},
}

r, err := router.NewTableRouter(false, rules)
r, err := router.NewRouter(false, rules)
c.Assert(err, IsNil)

tableInfo, err := parseTable(tcontext.Background(), r, "test1", "t3", "./dumpfile/test1.t3-schema.sql", "", "source1")
Expand Down
12 changes: 6 additions & 6 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -430,7 +430,7 @@ type Loader struct {

fileJobQueue chan *fileJob

tableRouter *router.Table
tableRouter *router.RouteTable
baList *filter.Filter
columnMapping *cm.Mapping

Expand Down Expand Up @@ -876,7 +876,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
var (
err error
oldBaList *filter.Filter
oldTableRouter *router.Table
oldTableRouter *router.RouteTable
oldColumnMapping *cm.Mapping
)

Expand Down Expand Up @@ -904,7 +904,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {

// update route, for loader, this almost useless, because schemas often have been restored
oldTableRouter = l.tableRouter
l.tableRouter, err = router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules)
l.tableRouter, err = router.NewRouter(cfg.CaseSensitive, cfg.RouteRules)
if err != nil {
return terror.ErrLoadUnitGenTableRouter.Delegate(err)
}
Expand All @@ -924,7 +924,7 @@ func (l *Loader) Update(ctx context.Context, cfg *config.SubTaskConfig) error {
}

func (l *Loader) genRouter(rules []*router.TableRule) error {
l.tableRouter, _ = router.NewTableRouter(l.cfg.CaseSensitive, []*router.TableRule{})
l.tableRouter, _ = router.NewRouter(l.cfg.CaseSensitive, []*router.TableRule{})
for _, rule := range rules {
err := l.tableRouter.AddRule(rule)
if err != nil {
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func renameShardingSchema(query, srcSchema, dstSchema string, ansiquote bool) st
return SQLReplace(query, srcSchema, dstSchema, ansiquote)
}

func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, table string) (targetSchema string, targetTable string) {
func fetchMatchedLiteral(ctx *tcontext.Context, router *router.RouteTable, schema, table string) (targetSchema string, targetTable string) {
if schema == "" {
// nothing change
return schema, table
Expand Down
Loading

0 comments on commit 2045a3a

Please sign in to comment.