diff --git a/dm/config/task.go b/dm/config/task.go index 489c7c1ce3..8ee279b953 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -25,7 +25,7 @@ import ( bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" - "github.com/pingcap/tidb-tools/pkg/table-router" + router "github.com/pingcap/tidb-tools/pkg/table-router" "go.uber.org/zap" yaml "gopkg.in/yaml.v2" ) diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index 464d6edec5..deda978d39 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io/ioutil" + "strings" "sync" "time" @@ -161,7 +162,8 @@ func (t *testServer) TestTaskAutoResume(c *C) { // start task cli := t.createClient(c, fmt.Sprintf("127.0.0.1:%d", port)) subtaskCfgBytes, err := ioutil.ReadFile("./subtask.toml") - _, err = cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{Task: string(subtaskCfgBytes)}) + // strings.Replace is used here to uncomment extra-args to avoid mydumper connecting to DB and generating arg --tables-list which will cause failure + _, err = cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{Task: strings.Replace(string(subtaskCfgBytes), "#extra-args", "extra-args", 1)}) c.Assert(err, IsNil) // check task in paused state diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index 2fa9f9946f..fafe1efa0d 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -53,13 +53,14 @@ func NewMydumper(cfg *config.SubTaskConfig) *Mydumper { cfg: cfg, logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "dump")), } - m.args = m.constructArgs() return m } // Init implements Unit.Init func (m *Mydumper) Init() error { - return nil // always return nil + var err error + m.args, err = m.constructArgs() + return err } // Process implements Unit.Process @@ -233,7 +234,7 @@ func (m *Mydumper) IsFreshTask() (bool, error) { } // constructArgs constructs arguments for exec.Command -func (m *Mydumper) constructArgs() []string { +func (m *Mydumper) constructArgs() ([]string, error) { cfg := m.cfg db := cfg.From @@ -263,11 +264,19 @@ func (m *Mydumper) constructArgs() []string { if len(extraArgs) > 0 { ret = append(ret, ParseArgLikeBash(extraArgs)...) } + if needToGenerateDoTables(extraArgs) { + m.logger.Info("Tables needed to dump are not given, now we will start to generate table list that mydumper needs to dump through black-white list from given fromDB") + doTables, err := fetchMyDumperDoTables(cfg) + if err != nil { + return nil, err + } + ret = append(ret, "--tables-list", doTables) + } m.logger.Info("create mydumper", zap.Strings("argument", ret)) ret = append(ret, "--password", db.Password) - return ret + return ret, nil } // logArgs constructs arguments for log from SubTaskConfig diff --git a/mydumper/mydumper_test.go b/mydumper/mydumper_test.go index bf14427b4e..0461316031 100644 --- a/mydumper/mydumper_test.go +++ b/mydumper/mydumper_test.go @@ -14,12 +14,16 @@ package mydumper import ( + "database/sql" "strings" "testing" . "github.com/pingcap/check" + "github.com/pingcap/tidb-tools/pkg/filter" + router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" ) var _ = Suite(&testMydumperSuite{}) @@ -29,7 +33,9 @@ func TestSuite(t *testing.T) { } type testMydumperSuite struct { - cfg *config.SubTaskConfig + cfg *config.SubTaskConfig + origApplyNewBaseDB func(config config.DBConfig) (*conn.BaseDB, error) + origFetchTargetDoTables func(*sql.DB, *filter.Filter, *router.Table) (map[string][]*filter.Table, error) } func (m *testMydumperSuite) SetUpSuite(c *C) { @@ -50,15 +56,68 @@ func (m *testMydumperSuite) SetUpSuite(c *C) { Dir: "./dumped_data", }, } + + m.origApplyNewBaseDB = applyNewBaseDB + m.origFetchTargetDoTables = fetchTargetDoTables + applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) { + return &conn.BaseDB{}, nil + } + fetchTargetDoTables = func(db *sql.DB, bw *filter.Filter, router *router.Table) (map[string][]*filter.Table, error) { + mapper := make(map[string][]*filter.Table) + mapper["mockDatabase"] = append(mapper["mockDatabase"], &filter.Table{ + Schema: "mockDatabase", + Name: "mockTable1", + }) + mapper["mockDatabase"] = append(mapper["mockDatabase"], &filter.Table{ + Schema: "mockDatabase", + Name: "mockTable2", + }) + return mapper, nil + } } -func (m *testMydumperSuite) TestArgs(c *C) { +func (m *testMydumperSuite) TearDownSuite(c *C) { + applyNewBaseDB = m.origApplyNewBaseDB + fetchTargetDoTables = m.origFetchTargetDoTables +} + +func generateArgsAndCompare(c *C, m *testMydumperSuite, expectedExtraArgs, extraArgs string) { expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root " + "--outputdir ./dumped_data --threads 4 --chunk-filesize 64 --skip-tz-utc " + - "--regex ^(?!(mysql|information_schema|performance_schema)) " + - "--password 123") - m.cfg.MydumperConfig.ExtraArgs = "--regex '^(?!(mysql|information_schema|performance_schema))'" + expectedExtraArgs + " --password 123") + m.cfg.MydumperConfig.ExtraArgs = extraArgs + mydumper := NewMydumper(m.cfg) - args := mydumper.constructArgs() + args, err := mydumper.constructArgs() + c.Assert(err, IsNil) c.Assert(args, DeepEquals, expected) } + +func testThroughGivenArgs(c *C, m *testMydumperSuite, arg, index string) { + quotedIndex := "'" + index + "'" // add quotes for constructArgs + generateArgsAndCompare(c, m, arg+" "+index, arg+" "+quotedIndex) +} + +func (m *testMydumperSuite) TestShouldNotGenerateExtraArgs(c *C) { + // -x, --regex + index := "^(?!(mysql|information_schema|performance_schema))" + testThroughGivenArgs(c, m, "-x", index) + testThroughGivenArgs(c, m, "--regex", index) + // -T, --tables-list + index = "testDatabase.testTable" + testThroughGivenArgs(c, m, "-T", index) + testThroughGivenArgs(c, m, "--tables-list", index) + // -B, --database + index = "testDatabase" + testThroughGivenArgs(c, m, "-B", index) + testThroughGivenArgs(c, m, "--database", index) +} + +func (m *testMydumperSuite) TestShouldGenerateExtraArgs(c *C) { + expectedMockResult := "--tables-list mockDatabase.mockTable1,mockDatabase.mockTable2" + // empty extraArgs + generateArgsAndCompare(c, m, expectedMockResult, "") + // extraArgs doesn't contains -T/-B/-x args + statement := "--statement-size=100" + generateArgsAndCompare(c, m, statement+" "+expectedMockResult, statement) +} diff --git a/mydumper/util.go b/mydumper/util.go index 737b6468d3..65317fe420 100644 --- a/mydumper/util.go +++ b/mydumper/util.go @@ -13,6 +13,20 @@ package mydumper +import ( + "strings" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" + "github.com/pingcap/dm/pkg/utils" + + "github.com/pingcap/tidb-tools/pkg/filter" + router "github.com/pingcap/tidb-tools/pkg/table-router" +) + +var applyNewBaseDB = conn.DefaultDBProvider.Apply +var fetchTargetDoTables = utils.FetchTargetDoTables + // ParseArgLikeBash parses list arguments like bash, which helps us to run // executable command via os/exec more likely running from bash func ParseArgLikeBash(args []string) []string { @@ -37,3 +51,46 @@ func trimOutQuotes(arg string) string { } return arg } + +// fetchMyDumperDoTables fetches and filters the tables that needed to be dumped through black-white list and route rules +func fetchMyDumperDoTables(cfg *config.SubTaskConfig) (string, error) { + fromDB, err := applyNewBaseDB(cfg.From) + if err != nil { + return "", err + } + defer fromDB.Close() + bw := filter.New(cfg.CaseSensitive, cfg.BWList) + r, err := router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules) + if err != nil { + return "", err + } + sourceTables, err := fetchTargetDoTables(fromDB.DB, bw, r) + if err != nil { + return "", err + } + var filteredTables []string + // TODO: For tables which contains special chars like ' , ` mydumper will fail while dumping. Once this bug is fixed on mydumper we should add quotes to table.Schema and table.Name + for _, tables := range sourceTables { + for _, table := range tables { + filteredTables = append(filteredTables, table.Schema+"."+table.Name) + } + } + return strings.Join(filteredTables, ","), nil +} + +// needToGenerateDoTables will check whether customers specify the databases/tables that needed to be dumped +// If not, this function will return true to notify mydumper to generate args +func needToGenerateDoTables(args []string) bool { + for _, arg := range args { + if arg == "-B" || arg == "--database" { + return false + } + if arg == "-T" || arg == "--tables-list" { + return false + } + if arg == "-x" || arg == "--regex" { + return false + } + } + return true +} diff --git a/pkg/utils/common.go b/pkg/utils/common.go index 753d0c7bb3..26b80ba8ec 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -26,7 +26,7 @@ import ( tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" - "github.com/pingcap/tidb-tools/pkg/table-router" + router "github.com/pingcap/tidb-tools/pkg/table-router" "go.uber.org/zap" ) diff --git a/tests/all_mode/conf/dm-task.yaml b/tests/all_mode/conf/dm-task.yaml index e53a9d7f90..4e22d0768c 100644 --- a/tests/all_mode/conf/dm-task.yaml +++ b/tests/all_mode/conf/dm-task.yaml @@ -38,7 +38,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "-B all_mode" + extra-args: "" loaders: global: diff --git a/tests/compatibility/conf/dm-task.yaml b/tests/compatibility/conf/dm-task.yaml index b089d4d528..08fe6877a4 100644 --- a/tests/compatibility/conf/dm-task.yaml +++ b/tests/compatibility/conf/dm-task.yaml @@ -38,7 +38,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "-B compatibility" + extra-args: "" loaders: global: diff --git a/tests/dmctl_basic/conf/dm-task.yaml b/tests/dmctl_basic/conf/dm-task.yaml index 6b861842fc..74912cd2be 100644 --- a/tests/dmctl_basic/conf/dm-task.yaml +++ b/tests/dmctl_basic/conf/dm-task.yaml @@ -71,7 +71,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "--regex '^dmctl.*'" + extra-args: "" loaders: global: diff --git a/tests/http_apis/conf/dm-task.yaml b/tests/http_apis/conf/dm-task.yaml index 33da3f2b4e..4b1c90f579 100644 --- a/tests/http_apis/conf/dm-task.yaml +++ b/tests/http_apis/conf/dm-task.yaml @@ -30,7 +30,7 @@ mydumpers: threads: 4 chunk-filesize: 0 skip-tz-utc: true - extra-args: "-B http_apis --statement-size=100" + extra-args: "--statement-size=100" loaders: global: diff --git a/tests/incremental_mode/conf/dm-task.yaml b/tests/incremental_mode/conf/dm-task.yaml index ed76ab6bed..30724e4cfd 100644 --- a/tests/incremental_mode/conf/dm-task.yaml +++ b/tests/incremental_mode/conf/dm-task.yaml @@ -44,7 +44,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "-B incremental_mode" + extra-args: "" loaders: global: diff --git a/tests/initial_unit/conf/dm-task.yaml b/tests/initial_unit/conf/dm-task.yaml index ad31d77dcd..f4515f9870 100644 --- a/tests/initial_unit/conf/dm-task.yaml +++ b/tests/initial_unit/conf/dm-task.yaml @@ -30,7 +30,7 @@ mydumpers: threads: 4 chunk-filesize: 0 skip-tz-utc: true - extra-args: "-B initial_unit --statement-size=100" + extra-args: "--statement-size=100" loaders: global: diff --git a/tests/load_interrupt/conf/dm-task.yaml b/tests/load_interrupt/conf/dm-task.yaml index b6f21924be..e33175801a 100644 --- a/tests/load_interrupt/conf/dm-task.yaml +++ b/tests/load_interrupt/conf/dm-task.yaml @@ -36,7 +36,7 @@ mydumpers: threads: 4 chunk-filesize: 0 skip-tz-utc: true - extra-args: "-B load_interrupt --statement-size=100" + extra-args: "--statement-size=100" loaders: global: diff --git a/tests/online_ddl/conf/dm-task.yaml b/tests/online_ddl/conf/dm-task.yaml index 16586fea84..4957af3ebb 100644 --- a/tests/online_ddl/conf/dm-task.yaml +++ b/tests/online_ddl/conf/dm-task.yaml @@ -75,7 +75,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "-B online_ddl" + extra-args: "" loaders: global: diff --git a/tests/print_status/conf/dm-task.yaml b/tests/print_status/conf/dm-task.yaml index a3427e5463..8bf4298d66 100644 --- a/tests/print_status/conf/dm-task.yaml +++ b/tests/print_status/conf/dm-task.yaml @@ -30,7 +30,7 @@ mydumpers: threads: 4 chunk-filesize: 0 skip-tz-utc: true - extra-args: "-B print_status --statement-size=5000" + extra-args: "--statement-size=5000" loaders: global: diff --git a/tests/relay_interrupt/conf/dm-task.yaml b/tests/relay_interrupt/conf/dm-task.yaml index 5c71f396fd..8291577e02 100644 --- a/tests/relay_interrupt/conf/dm-task.yaml +++ b/tests/relay_interrupt/conf/dm-task.yaml @@ -30,7 +30,7 @@ mydumpers: threads: 4 chunk-filesize: 0 skip-tz-utc: true - extra-args: "-B relay_interrupt --statement-size=100" + extra-args: "--statement-size=100" loaders: global: diff --git a/tests/safe_mode/conf/dm-task.yaml b/tests/safe_mode/conf/dm-task.yaml index f188c213cb..93495c3fd0 100644 --- a/tests/safe_mode/conf/dm-task.yaml +++ b/tests/safe_mode/conf/dm-task.yaml @@ -71,7 +71,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "-B safe_mode_test" + extra-args: "" loaders: global: diff --git a/tests/sequence_safe_mode/conf/dm-task.yaml b/tests/sequence_safe_mode/conf/dm-task.yaml index 8811591386..1a31a7cb92 100644 --- a/tests/sequence_safe_mode/conf/dm-task.yaml +++ b/tests/sequence_safe_mode/conf/dm-task.yaml @@ -71,7 +71,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "-B sequence_safe_mode_test" + extra-args: "" loaders: global: diff --git a/tests/sequence_sharding/conf/dm-task.yaml b/tests/sequence_sharding/conf/dm-task.yaml index ed425fc560..c809bf532a 100644 --- a/tests/sequence_sharding/conf/dm-task.yaml +++ b/tests/sequence_sharding/conf/dm-task.yaml @@ -71,7 +71,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "--regex '^sharding_seq.*'" + extra-args: "" loaders: global: diff --git a/tests/sharding/conf/dm-task.yaml b/tests/sharding/conf/dm-task.yaml index 49fa40096a..fd2a7ec7ad 100644 --- a/tests/sharding/conf/dm-task.yaml +++ b/tests/sharding/conf/dm-task.yaml @@ -71,7 +71,7 @@ mydumpers: threads: 4 chunk-filesize: 64 skip-tz-utc: true - extra-args: "--regex '^sharding.*'" + extra-args: "" loaders: global: diff --git a/tests/start_task/conf/dm-task.yaml b/tests/start_task/conf/dm-task.yaml index f7fcb809a7..a51b9087e2 100644 --- a/tests/start_task/conf/dm-task.yaml +++ b/tests/start_task/conf/dm-task.yaml @@ -30,6 +30,7 @@ mydumpers: threads: 4 chunk-filesize: 0 skip-tz-utc: true + # start_task integration_test will add failpoint to function fetchTables will cause mydumper failing to start and won't rebuild dm-worker during restart so extra-args is given here extra-args: "-B start_task --statement-size=100" loaders: