Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

mydumper: generate tables needed to dump for mydumper automatically (#310) #326

Merged
merged 19 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
)
Expand Down
4 changes: 3 additions & 1 deletion dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io/ioutil"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -161,7 +162,8 @@ func (t *testServer) TestTaskAutoResume(c *C) {
// start task
cli := t.createClient(c, fmt.Sprintf("127.0.0.1:%d", port))
subtaskCfgBytes, err := ioutil.ReadFile("./subtask.toml")
_, err = cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{Task: string(subtaskCfgBytes)})
// strings.Replace is used here to uncomment extra-args to avoid mydumper connecting to DB and generating arg --tables-list which will cause failure
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the only way to handle this problem? it's tricky

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I admit that. Perhaps we can add failpoint to solve this problem?

_, err = cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{Task: strings.Replace(string(subtaskCfgBytes), "#extra-args", "extra-args", 1)})
c.Assert(err, IsNil)

// check task in paused state
Expand Down
17 changes: 13 additions & 4 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func NewMydumper(cfg *config.SubTaskConfig) *Mydumper {
cfg: cfg,
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "dump")),
}
m.args = m.constructArgs()
return m
}

// Init implements Unit.Init
func (m *Mydumper) Init() error {
return nil // always return nil
var err error
m.args, err = m.constructArgs()
return err
}

// Process implements Unit.Process
Expand Down Expand Up @@ -233,7 +234,7 @@ func (m *Mydumper) IsFreshTask() (bool, error) {
}

// constructArgs constructs arguments for exec.Command
func (m *Mydumper) constructArgs() []string {
func (m *Mydumper) constructArgs() ([]string, error) {
cfg := m.cfg
db := cfg.From

Expand Down Expand Up @@ -263,11 +264,19 @@ func (m *Mydumper) constructArgs() []string {
if len(extraArgs) > 0 {
ret = append(ret, ParseArgLikeBash(extraArgs)...)
}
if needToGenerateDoTables(extraArgs) {
m.logger.Info("Tables needed to dump are not given, now we will start to generate table list that mydumper needs to dump through black-white list from given fromDB")
doTables, err := fetchMyDumperDoTables(cfg)
if err != nil {
return nil, err
}
ret = append(ret, "--tables-list", doTables)
}

m.logger.Info("create mydumper", zap.Strings("argument", ret))

ret = append(ret, "--password", db.Password)
return ret
return ret, nil
}

// logArgs constructs arguments for log from SubTaskConfig
Expand Down
71 changes: 65 additions & 6 deletions mydumper/mydumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
package mydumper

import (
"database/sql"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
)

var _ = Suite(&testMydumperSuite{})
Expand All @@ -29,7 +33,9 @@ func TestSuite(t *testing.T) {
}

type testMydumperSuite struct {
cfg *config.SubTaskConfig
cfg *config.SubTaskConfig
origApplyNewBaseDB func(config config.DBConfig) (*conn.BaseDB, error)
origFetchTargetDoTables func(*sql.DB, *filter.Filter, *router.Table) (map[string][]*filter.Table, error)
}

func (m *testMydumperSuite) SetUpSuite(c *C) {
Expand All @@ -50,15 +56,68 @@ func (m *testMydumperSuite) SetUpSuite(c *C) {
Dir: "./dumped_data",
},
}

m.origApplyNewBaseDB = applyNewBaseDB
m.origFetchTargetDoTables = fetchTargetDoTables
applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) {
return &conn.BaseDB{}, nil
}
fetchTargetDoTables = func(db *sql.DB, bw *filter.Filter, router *router.Table) (map[string][]*filter.Table, error) {
mapper := make(map[string][]*filter.Table)
mapper["mockDatabase"] = append(mapper["mockDatabase"], &filter.Table{
Schema: "mockDatabase",
Name: "mockTable1",
})
mapper["mockDatabase"] = append(mapper["mockDatabase"], &filter.Table{
Schema: "mockDatabase",
Name: "mockTable2",
})
return mapper, nil
}
}

func (m *testMydumperSuite) TestArgs(c *C) {
func (m *testMydumperSuite) TearDownSuite(c *C) {
applyNewBaseDB = m.origApplyNewBaseDB
fetchTargetDoTables = m.origFetchTargetDoTables
}

func generateArgsAndCompare(c *C, m *testMydumperSuite, expectedExtraArgs, extraArgs string) {
expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root " +
"--outputdir ./dumped_data --threads 4 --chunk-filesize 64 --skip-tz-utc " +
"--regex ^(?!(mysql|information_schema|performance_schema)) " +
"--password 123")
m.cfg.MydumperConfig.ExtraArgs = "--regex '^(?!(mysql|information_schema|performance_schema))'"
expectedExtraArgs + " --password 123")
m.cfg.MydumperConfig.ExtraArgs = extraArgs

mydumper := NewMydumper(m.cfg)
args := mydumper.constructArgs()
args, err := mydumper.constructArgs()
c.Assert(err, IsNil)
c.Assert(args, DeepEquals, expected)
}

func testThroughGivenArgs(c *C, m *testMydumperSuite, arg, index string) {
quotedIndex := "'" + index + "'" // add quotes for constructArgs
generateArgsAndCompare(c, m, arg+" "+index, arg+" "+quotedIndex)
}

func (m *testMydumperSuite) TestShouldNotGenerateExtraArgs(c *C) {
// -x, --regex
index := "^(?!(mysql|information_schema|performance_schema))"
testThroughGivenArgs(c, m, "-x", index)
testThroughGivenArgs(c, m, "--regex", index)
// -T, --tables-list
index = "testDatabase.testTable"
testThroughGivenArgs(c, m, "-T", index)
testThroughGivenArgs(c, m, "--tables-list", index)
// -B, --database
index = "testDatabase"
testThroughGivenArgs(c, m, "-B", index)
testThroughGivenArgs(c, m, "--database", index)
}

func (m *testMydumperSuite) TestShouldGenerateExtraArgs(c *C) {
expectedMockResult := "--tables-list mockDatabase.mockTable1,mockDatabase.mockTable2"
// empty extraArgs
generateArgsAndCompare(c, m, expectedMockResult, "")
// extraArgs doesn't contains -T/-B/-x args
statement := "--statement-size=100"
generateArgsAndCompare(c, m, statement+" "+expectedMockResult, statement)
}
57 changes: 57 additions & 0 deletions mydumper/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@

package mydumper

import (
"strings"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
)

var applyNewBaseDB = conn.DefaultDBProvider.Apply
var fetchTargetDoTables = utils.FetchTargetDoTables

// ParseArgLikeBash parses list arguments like bash, which helps us to run
// executable command via os/exec more likely running from bash
func ParseArgLikeBash(args []string) []string {
Expand All @@ -37,3 +51,46 @@ func trimOutQuotes(arg string) string {
}
return arg
}

// fetchMyDumperDoTables fetches and filters the tables that needed to be dumped through black-white list and route rules
func fetchMyDumperDoTables(cfg *config.SubTaskConfig) (string, error) {
fromDB, err := applyNewBaseDB(cfg.From)
if err != nil {
return "", err
}
defer fromDB.Close()
bw := filter.New(cfg.CaseSensitive, cfg.BWList)
r, err := router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules)
if err != nil {
return "", err
}
sourceTables, err := fetchTargetDoTables(fromDB.DB, bw, r)
if err != nil {
return "", err
}
var filteredTables []string
// TODO: For tables which contains special chars like ' , ` mydumper will fail while dumping. Once this bug is fixed on mydumper we should add quotes to table.Schema and table.Name
for _, tables := range sourceTables {
for _, table := range tables {
filteredTables = append(filteredTables, table.Schema+"."+table.Name)
}
}
return strings.Join(filteredTables, ","), nil
}

// needToGenerateDoTables will check whether customers specify the databases/tables that needed to be dumped
// If not, this function will return true to notify mydumper to generate args
func needToGenerateDoTables(args []string) bool {
for _, arg := range args {
if arg == "-B" || arg == "--database" {
return false
}
if arg == "-T" || arg == "--tables-list" {
return false
}
if arg == "-x" || arg == "--regex" {
return false
}
}
return true
}
2 changes: 1 addition & 1 deletion pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion tests/all_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B all_mode"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/compatibility/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B compatibility"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/dmctl_basic/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--regex '^dmctl.*'"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/http_apis/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B http_apis --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/incremental_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B incremental_mode"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/initial_unit/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B initial_unit --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/load_interrupt/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B load_interrupt --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B online_ddl"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/print_status/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B print_status --statement-size=5000"
extra-args: "--statement-size=5000"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/relay_interrupt/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B relay_interrupt --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/safe_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B safe_mode_test"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/sequence_safe_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B sequence_safe_mode_test"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/sequence_sharding/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--regex '^sharding_seq.*'"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/sharding/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--regex '^sharding.*'"
extra-args: ""

loaders:
global:
Expand Down
1 change: 1 addition & 0 deletions tests/start_task/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
# start_task integration_test will add failpoint to function fetchTables will cause mydumper failing to start and won't rebuild dm-worker during restart so extra-args is given here
extra-args: "-B start_task --statement-size=100"

loaders:
Expand Down