diff --git a/checker/checker.go b/checker/checker.go index 927fe7bb3b..15dff813fd 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -72,8 +72,10 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) * } for _, cfg := range cfgs { + // we have verify it in subtask config + replica, _ := cfg.DecryptPassword() c.instances = append(c.instances, &mysqlInstance{ - cfg: cfg, + cfg: replica, }) } @@ -118,7 +120,6 @@ func (c *Checker) Init() (err error) { User: instance.cfg.From.User, Password: instance.cfg.From.Password, } - instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo) if err != nil { return errors.Trace(err) @@ -130,13 +131,6 @@ func (c *Checker) Init() (err error) { User: instance.cfg.To.User, Password: instance.cfg.To.Password, } - if len(instance.targetDBInfo.Password) > 0 { - pswd, err2 := utils.Decrypt(instance.targetDBInfo.Password) - if err2 != nil { - return errors.Annotatef(err2, "can not decrypt password %s", instance.targetDBInfo.Password) - } - instance.targetDBInfo.Password = pswd - } instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo) if err != nil { return errors.Trace(err) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index ceee578361..40bcde4bfe 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -253,7 +253,8 @@ func (c *SubTaskConfig) adjust() error { } } - return nil + _, err := c.DecryptPassword() + return err } // Parse parses flag definitions from the argument list. @@ -289,3 +290,48 @@ func (c *SubTaskConfig) Parse(arguments []string) error { return errors.Trace(c.adjust()) } + +// DecryptPassword tries to decrypt db password in config +func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) { + clone, err := c.Clone() + if err != nil { + return nil, errors.Trace(err) + } + + var ( + pswdTo string + pswdFrom string + ) + if len(clone.To.Password) > 0 { + pswdTo, err = utils.Decrypt(clone.To.Password) + if err != nil { + return nil, errors.Annotatef(err, "downstream DB") + } + } + if len(clone.From.Password) > 0 { + pswdFrom, err = utils.Decrypt(clone.From.Password) + if err != nil { + return nil, errors.Annotatef(err, "source DB") + } + } + clone.From.Password = pswdFrom + clone.To.Password = pswdTo + + return clone, nil +} + +// Clone returns a replica of SubTaskConfig +func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) { + content, err := c.Toml() + if err != nil { + return nil, errors.Trace(err) + } + + clone := &SubTaskConfig{} + _, err = toml.Decode(content, clone) + if err != nil { + return nil, errors.Trace(err) + } + + return clone, nil +} diff --git a/dm/config/subtask_test.go b/dm/config/subtask_test.go new file mode 100644 index 0000000000..0000325d1d --- /dev/null +++ b/dm/config/subtask_test.go @@ -0,0 +1,52 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + . "github.com/pingcap/check" +) + +func (t *testConfig) TestSubTask(c *C) { + cfg := &SubTaskConfig{ + From: DBConfig{ + Host: "127.0.0.1", + Port: 3306, + User: "root", + Password: "Up8156jArvIPymkVC+5LxkAT6rek", + }, + To: DBConfig{ + Host: "127.0.0.1", + Port: 4306, + User: "root", + Password: "", + }, + } + + clone1, err := cfg.Clone() + c.Assert(err, IsNil) + c.Assert(cfg, DeepEquals, clone1) + + clone1.From.Password = "1234" + clone2, err := cfg.DecryptPassword() + c.Assert(err, IsNil) + c.Assert(clone2, DeepEquals, clone1) + + cfg.From.Password = "xxx" + clone3, err := cfg.DecryptPassword() + c.Assert(err, NotNil) + + cfg.From.Password = "" + clone3, err = cfg.DecryptPassword() + c.Assert(clone3, DeepEquals, cfg) +} diff --git a/dm/master/server.go b/dm/master/server.go index 91879e6631..fe30966ad4 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1718,7 +1718,7 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C if err != nil { return &pb.CheckTaskResponse{ Result: false, - Msg: errors.Cause(err).Error(), + Msg: errors.ErrorStack(err), }, nil } diff --git a/dm/tracer/server.go b/dm/tracer/server.go index 99babf056e..a64eeac1ad 100644 --- a/dm/tracer/server.go +++ b/dm/tracer/server.go @@ -14,6 +14,7 @@ package tracer import ( + "context" "net" "net/http" "sync" @@ -23,7 +24,6 @@ import ( "github.com/pingcap/errors" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" - "golang.org/x/net/context" "google.golang.org/grpc" "github.com/pingcap/dm/dm/common" diff --git a/dm/worker/config.go b/dm/worker/config.go index 8d1fe2b03b..7265708a27 100644 --- a/dm/worker/config.go +++ b/dm/worker/config.go @@ -77,6 +77,7 @@ type Config struct { EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` RelayDir string `toml:"relay-dir" json:"relay-dir"` + MetaDir string `toml:"meta-dir" json:"meta-dir"` ServerID int `toml:"server-id" json:"server-id"` Flavor string `toml:"flavor" json:"flavor"` Charset string `toml:"charset" json:"charset"` @@ -118,29 +119,12 @@ func (c *Config) String() string { // Toml returns TOML format representation of config func (c *Config) Toml() (string, error) { var b bytes.Buffer - var pswd string - var err error - - enc := toml.NewEncoder(&b) - if len(c.From.Password) > 0 { - pswd, err = utils.Encrypt(c.From.Password) - if err != nil { - return "", errors.Annotatef(err, "can not encrypt password %s", c.From.Password) - } - } - c.From.Password = pswd - err = enc.Encode(c) + err := toml.NewEncoder(&b).Encode(c) if err != nil { log.Errorf("[worker] marshal config to toml error %v", err) } - if len(c.From.Password) > 0 { - pswd, err = utils.Decrypt(c.From.Password) - if err != nil { - return "", errors.Annotatef(err, "can not decrypt password %s", c.From.Password) - } - } - c.From.Password = pswd + return string(b.String()), nil } @@ -189,15 +173,9 @@ func (c *Config) Parse(arguments []string) error { return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0)) } - // try decrypt password - var pswd string - if len(c.From.Password) > 0 { - pswd, err = utils.Decrypt(c.From.Password) - if err != nil { - return errors.Annotatef(err, "can not decrypt password %s", c.From.Password) - } + if len(c.MetaDir) == 0 { + c.MetaDir = "./dm_worker_meta" } - c.From.Password = pswd // assign tracer id to source id c.Tracer.Source = c.SourceID @@ -211,25 +189,40 @@ func (c *Config) verify() error { return errors.Errorf("dm-worker should bind a non-empty source ID which represents a MySQL/MariaDB instance or a replica group. \n notice: if you use old version dm-ansible, please update to newest version.") } + var err error if len(c.RelayBinLogName) > 0 { - _, err := streamer.GetBinlogFileIndex(c.RelayBinLogName) + _, err = streamer.GetBinlogFileIndex(c.RelayBinLogName) if err != nil { return errors.Annotatef(err, "relay-binlog-name %s", c.RelayBinLogName) } } if len(c.RelayBinlogGTID) > 0 { - _, err := gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID) + _, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID) if err != nil { return errors.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID) } } + + _, err = c.DecryptPassword() + if err != nil { + return errors.Trace(err) + } + return nil } // configFromFile loads config from file. func (c *Config) configFromFile(path string) error { _, err := toml.DecodeFile(path, c) - return errors.Trace(err) + if err != nil { + return errors.Trace(err) + } + + err = c.verify() + if err != nil { + return errors.Trace(err) + } + return nil } // UpdateConfigFile write configure to local file @@ -246,25 +239,32 @@ func (c *Config) UpdateConfigFile(content string) error { // Reload reload configure from ConfigFile func (c *Config) Reload() error { - var pswd string - var err error - if c.ConfigFile == "" { c.ConfigFile = "dm-worker-config.bak" } - err = c.configFromFile(c.ConfigFile) + err := c.configFromFile(c.ConfigFile) if err != nil { return errors.Trace(err) } - if len(c.From.Password) > 0 { - pswd, err = utils.Decrypt(c.From.Password) + return nil +} + +// DecryptPassword returns a decrypted config replica in config +func (c *Config) DecryptPassword() (*Config, error) { + clone := c.Clone() + var ( + pswdFrom string + err error + ) + if len(clone.From.Password) > 0 { + pswdFrom, err = utils.Decrypt(clone.From.Password) if err != nil { - return errors.Annotatef(err, "can not decrypt password %s", c.From.Password) + return nil, errors.Trace(err) } } - c.From.Password = pswd + clone.From.Password = pswdFrom - return nil + return clone, nil } diff --git a/dm/worker/config_test.go b/dm/worker/config_test.go new file mode 100644 index 0000000000..548e085dbd --- /dev/null +++ b/dm/worker/config_test.go @@ -0,0 +1,42 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + . "github.com/pingcap/check" +) + +func (t *testWorker) TestConfig(c *C) { + cfg := &Config{} + + err := cfg.configFromFile("./dm-worker.toml") + c.Assert(err, IsNil) + c.Assert(cfg.SourceID, Equals, "mysql-replica-01") + + clone1 := cfg.Clone() + c.Assert(cfg, DeepEquals, clone1) + + clone1.From.Password = "1234" + clone2, err := cfg.DecryptPassword() + c.Assert(err, IsNil) + c.Assert(clone2, DeepEquals, clone1) + + cfg.From.Password = "xxx" + clone3, err := cfg.DecryptPassword() + c.Assert(err, NotNil) + + cfg.From.Password = "" + clone3, err = cfg.DecryptPassword() + c.Assert(clone3, DeepEquals, cfg) +} diff --git a/dm/worker/dm-worker.toml b/dm/worker/dm-worker.toml index f8b178970c..1e180431ba 100644 --- a/dm/worker/dm-worker.toml +++ b/dm/worker/dm-worker.toml @@ -29,7 +29,7 @@ enable-gtid = false [from] host = "127.0.0.1" user = "root" -password = "" +password = "Up8156jArvIPymkVC+5LxkAT6rek" port = 3306 #relay log purge strategy diff --git a/dm/worker/meta.go b/dm/worker/meta.go new file mode 100644 index 0000000000..bf7757c536 --- /dev/null +++ b/dm/worker/meta.go @@ -0,0 +1,157 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "bytes" + "io/ioutil" + "os" + "path" + "sync" + + "github.com/BurntSushi/toml" + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/errors" +) + +// Meta information contains +// * sub-task +type Meta struct { + SubTasks map[string]*config.SubTaskConfig `json:"sub-tasks" toml:"sub-tasks"` +} + +// Toml returns TOML format representation of config +func (m *Meta) Toml() (string, error) { + var b bytes.Buffer + enc := toml.NewEncoder(&b) + err := enc.Encode(m) + if err != nil { + return "", errors.Trace(err) + } + return b.String(), nil +} + +// DecodeFile loads and decodes config from file +func (m *Meta) DecodeFile(fpath string) error { + _, err := toml.DecodeFile(fpath, m) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// Decode loads config from file data +func (m *Meta) Decode(data string) error { + _, err := toml.Decode(data, m) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// FileMetaDB stores meta information in disk +type FileMetaDB struct { + lock sync.Mutex // we need to ensure only a thread can access to `metaDB` at a time + meta *Meta + path string +} + +// NewFileMetaDB returns a meta file db +func NewFileMetaDB(dir string) (*FileMetaDB, error) { + metaDB := &FileMetaDB{ + path: path.Join(dir, "meta"), + meta: &Meta{ + SubTasks: make(map[string]*config.SubTaskConfig), + }, + } + + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, errors.Annotatef(err, "create meta directory %s", dir) + } + + fd, err := os.Open(metaDB.path) + if os.IsNotExist(err) { + log.Warnf("failed to open meta file %s, going to create a new one: %v", metaDB.path, err) + return metaDB, nil + } else if err != nil { + return nil, errors.Trace(err) + } + defer fd.Close() + + err = metaDB.meta.DecodeFile(metaDB.path) + if err != nil { + return metaDB, errors.Trace(err) + } + + return metaDB, nil +} + +// Close closes meta DB +func (metaDB *FileMetaDB) Close() error { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + return errors.Trace(metaDB.save()) +} + +func (metaDB *FileMetaDB) save() error { + serialized, err := metaDB.meta.Toml() + if err != nil { + return errors.Trace(err) + } + + if err := ioutil.WriteFile(metaDB.path, []byte(serialized), 0644); err != nil { + return errors.Trace(err) + } + return nil +} + +// Get returns `Meta` object +func (metaDB *FileMetaDB) Get() *Meta { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + meta := &Meta{ + SubTasks: make(map[string]*config.SubTaskConfig), + } + + for name, task := range metaDB.meta.SubTasks { + meta.SubTasks[name] = task + } + + return meta +} + +// Set sets subtask in Meta +func (metaDB *FileMetaDB) Set(subTask *config.SubTaskConfig) error { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + metaDB.meta.SubTasks[subTask.Name] = subTask + + return errors.Trace(metaDB.save()) +} + +// Del deletes subtask in Meta +func (metaDB *FileMetaDB) Del(name string) error { + metaDB.lock.Lock() + defer metaDB.lock.Unlock() + + delete(metaDB.meta.SubTasks, name) + + return errors.Trace(metaDB.save()) +} diff --git a/dm/worker/meta_test.go b/dm/worker/meta_test.go new file mode 100644 index 0000000000..631892ef07 --- /dev/null +++ b/dm/worker/meta_test.go @@ -0,0 +1,60 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package worker + +import ( + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/dm/dm/config" +) + +func TestWorker(t *testing.T) { + TestingT(t) +} + +type testWorker struct{} + +var _ = Suite(&testWorker{}) + +func (t *testWorker) TestFileMetaDB(c *C) { + dir := c.MkDir() + + metaDB, err := NewFileMetaDB(dir) + c.Assert(err, IsNil) + c.Assert(metaDB.meta.SubTasks, HasLen, 0) + + meta := metaDB.Get() + c.Assert(meta.SubTasks, HasLen, 0) + + err = metaDB.Set(&config.SubTaskConfig{ + Name: "task1", + }) + c.Assert(err, IsNil) + + meta = metaDB.Get() + c.Assert(meta.SubTasks, HasLen, 1) + c.Assert(meta.SubTasks["task1"], NotNil) + + c.Assert(metaDB.Close(), IsNil) + + metaDB, err = NewFileMetaDB(dir) + c.Assert(err, IsNil) + c.Assert(metaDB.meta.SubTasks, HasLen, 1) + c.Assert(meta.SubTasks["task1"], NotNil) + + c.Assert(metaDB.Del("task1"), IsNil) + meta = metaDB.Get() + c.Assert(meta.SubTasks, HasLen, 0) +} diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 597642487a..6caa8ebbd8 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -43,21 +43,22 @@ type RelayHolder struct { // NewRelayHolder creates a new RelayHolder func NewRelayHolder(cfg *Config) *RelayHolder { + clone, _ := cfg.DecryptPassword() relayCfg := &relay.Config{ - EnableGTID: cfg.EnableGTID, - AutoFixGTID: cfg.AutoFixGTID, - Flavor: cfg.Flavor, - RelayDir: cfg.RelayDir, - ServerID: cfg.ServerID, - Charset: cfg.Charset, + EnableGTID: clone.EnableGTID, + AutoFixGTID: clone.AutoFixGTID, + Flavor: clone.Flavor, + RelayDir: clone.RelayDir, + ServerID: clone.ServerID, + Charset: clone.Charset, From: relay.DBConfig{ - Host: cfg.From.Host, - Port: cfg.From.Port, - User: cfg.From.User, - Password: cfg.From.Password, + Host: clone.From.Host, + Port: clone.From.Port, + User: clone.From.User, + Password: clone.From.Password, }, - BinLogName: cfg.RelayBinLogName, - BinlogGTID: cfg.RelayBinlogGTID, + BinLogName: clone.RelayBinLogName, + BinlogGTID: clone.RelayBinlogGTID, } h := &RelayHolder{ diff --git a/dm/worker/server.go b/dm/worker/server.go index f02040d862..e4d8149805 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -149,6 +149,14 @@ func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) }, nil } + if err = s.worker.meta.Set(cfg); err != nil { + log.Errorf("[server] insert task %s into meta: %v", cfg, errors.ErrorStack(err)) + return &pb.CommonWorkerResponse{ + Result: false, + Msg: fmt.Sprintf("insert task %s into meta: %v", cfg, errors.ErrorStack(err)), + }, nil + } + err = s.worker.StartSubTask(cfg) if err != nil { log.Errorf("[server] start sub task %s error %v", cfg.Name, errors.ErrorStack(err)) @@ -177,7 +185,12 @@ func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskReque var err error switch req.Op { case pb.TaskOp_Stop: - err = s.worker.StopSubTask(name) + if err = s.worker.meta.Del(name); err != nil { + resp.Msg = fmt.Sprintf("update task %s into meta: %v", name, errors.ErrorStack(err)) + log.Errorf(resp.Msg) + } else { + err = s.worker.StopSubTask(name) + } case pb.TaskOp_Pause: err = s.worker.PauseSubTask(name) case pb.TaskOp_Resume: @@ -211,6 +224,15 @@ func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest }, nil } + if err = s.worker.meta.Set(cfg); err != nil { + errMsg := fmt.Sprintf("[server] update task %s into meta: %v", cfg, errors.ErrorStack(err)) + log.Errorf(errMsg) + return &pb.CommonWorkerResponse{ + Result: false, + Msg: errMsg, + }, nil + } + err = s.worker.UpdateSubTask(cfg) if err != nil { log.Errorf("[server] update sub task %s error %v", cfg.Name, errors.ErrorStack(err)) diff --git a/dm/worker/worker.go b/dm/worker/worker.go index 75da749c9a..d7c70fa631 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/tracing" - "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/relay/purger" ) @@ -54,7 +53,9 @@ type Worker struct { subTasks map[string]*SubTask relayHolder *RelayHolder relayPurger *purger.Purger - tracer *tracing.Tracer + + meta *FileMetaDB + tracer *tracing.Tracer } // NewWorker creates a new Worker @@ -89,6 +90,11 @@ func (w *Worker) Init() error { InitConditionHub(w) + w.meta, err = NewFileMetaDB(w.cfg.MetaDir) + if err != nil { + return errors.Trace(err) + } + return nil } @@ -106,6 +112,14 @@ func (w *Worker) Start() { // start purger w.relayPurger.Start() + // restore tasks + meta := w.meta.Get() + for taskName, subtask := range meta.SubTasks { + if err := w.StartSubTask(subtask); err != nil { + panic(fmt.Sprintf("restore task %s (%s) in worker starting: %v", taskName, subtask, err)) + } + } + // start tracer if w.tracer.Enable() { w.tracer.Start() @@ -175,22 +189,10 @@ func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error { w.copyConfigFromWorker(cfg) log.Infof("[worker] starting sub task with config: %v", cfg) + cloneCfg, _ := cfg.DecryptPassword() - // try decrypt password for To DB - var ( - pswdTo string - err error - ) - if len(cfg.To.Password) > 0 { - pswdTo, err = utils.Decrypt(cfg.To.Password) - if err != nil { - return errors.Trace(err) - } - } - cfg.To.Password = pswdTo - - st := NewSubTask(cfg) - err = st.Init() + st := NewSubTask(cloneCfg) + err := st.Init() if err != nil { return errors.Trace(err) } @@ -584,12 +586,13 @@ func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error { } log.Infof("[worker] update relay configure with config: %v", newCfg) + cloneCfg, _ := newCfg.DecryptPassword() // Update SubTask configure for _, st := range w.subTasks { cfg := config.NewSubTaskConfig() - cfg.From = newCfg.From + cfg.From = cloneCfg.From stage := st.Stage() if stage == pb.Stage_Paused { diff --git a/mydumper/mydumper.go b/mydumper/mydumper.go index b18f9bf738..b042e11ccf 100644 --- a/mydumper/mydumper.go +++ b/mydumper/mydumper.go @@ -68,7 +68,6 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) { // Cmd cannot be reused, so we create a new cmd when begin processing cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...) - log.Infof("[mydumper] starting mydumper using args %v", cmd.Args) output, err := cmd.CombinedOutput() if err != nil { @@ -149,6 +148,7 @@ func (m *Mydumper) IsFreshTask() (bool, error) { func (m *Mydumper) constructArgs() []string { cfg := m.cfg db := cfg.From + ret := []string{ "--host", db.Host, @@ -156,8 +156,6 @@ func (m *Mydumper) constructArgs() []string { strconv.Itoa(db.Port), "--user", db.User, - "--password", - db.Password, "--outputdir", cfg.Dir, // use LoaderConfig.Dir as --outputdir } @@ -177,6 +175,10 @@ func (m *Mydumper) constructArgs() []string { if len(extraArgs) > 0 { ret = append(ret, ParseArgLikeBash(extraArgs)...) } + + log.Infof("[mydumper] create mydumper using args %v", ret) + + ret = append(ret, "--password", db.Password) return ret } diff --git a/mydumper/mydumper_test.go b/mydumper/mydumper_test.go index 512d3dd5c3..bf14427b4e 100644 --- a/mydumper/mydumper_test.go +++ b/mydumper/mydumper_test.go @@ -53,9 +53,10 @@ func (m *testMydumperSuite) SetUpSuite(c *C) { } func (m *testMydumperSuite) TestArgs(c *C) { - expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root --password 123 " + + expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root " + "--outputdir ./dumped_data --threads 4 --chunk-filesize 64 --skip-tz-utc " + - "--regex ^(?!(mysql|information_schema|performance_schema))") + "--regex ^(?!(mysql|information_schema|performance_schema)) " + + "--password 123") m.cfg.MydumperConfig.ExtraArgs = "--regex '^(?!(mysql|information_schema|performance_schema))'" mydumper := NewMydumper(m.cfg) args := mydumper.constructArgs() diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go index 016af3daeb..e3a6aa5c1c 100644 --- a/pkg/tracing/tracer.go +++ b/pkg/tracing/tracer.go @@ -14,12 +14,12 @@ package tracing import ( + "context" "sync" "time" "github.com/pingcap/errors" "github.com/siddontang/go/sync2" - "golang.org/x/net/context" "google.golang.org/grpc" "github.com/pingcap/dm/dm/pb" diff --git a/pkg/tracing/tracer_test.go b/pkg/tracing/tracer_test.go index 8fb5bbe4ce..b7cd566a06 100644 --- a/pkg/tracing/tracer_test.go +++ b/pkg/tracing/tracer_test.go @@ -14,6 +14,7 @@ package tracing import ( + "context" "fmt" "net" "sync" @@ -26,7 +27,6 @@ import ( "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" - "golang.org/x/net/context" "google.golang.org/grpc" "github.com/pingcap/dm/dm/common" diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 60b407c0ee..133d53c8be 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -262,3 +262,21 @@ func IsErrDupEntry(err error) bool { func IsNoSuchThreadError(err error) bool { return IsMySQLError(err, tmysql.ErrNoSuchThread) } + +// OpenDBWithEncryptedPwd returns a db fd with encrypted password +/*func OpenDBWithEncryptedPwd(host string, port int, user, password, timeout string) (db *sql.DB, err error) { + if len(password) > 0 { + password, err = Decrypt(password) + if err != nil { + return nil, errors.Annotatef(err, "can not decrypt password %s of user %s for db %s:%d", password, user, host, port) + } + } + + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&interpolateParams=true&readTimeout=%s", user, password, host, port, timeout) + db, err = sql.Open("mysql", dbDSN) + if err != nil { + return nil, errors.Trace(err) + } + + return +}*/ diff --git a/pkg/utils/encrypt.go b/pkg/utils/encrypt.go index c9d4c13616..00017448f2 100644 --- a/pkg/utils/encrypt.go +++ b/pkg/utils/encrypt.go @@ -34,12 +34,12 @@ func Encrypt(plaintext string) (string, error) { func Decrypt(ciphertextB64 string) (string, error) { ciphertext, err := base64.StdEncoding.DecodeString(ciphertextB64) if err != nil { - return "", errors.Trace(err) + return "", errors.Annotatef(err, "can not decrypt password %s", ciphertextB64) } plaintext, err := encrypt.Decrypt(ciphertext) if err != nil { - return "", errors.Trace(err) + return "", errors.Annotatef(err, "can not decrypt password %s", ciphertextB64) } return string(plaintext), nil } diff --git a/tests/_utils/run_dm_worker b/tests/_utils/run_dm_worker index ae948ed604..e4055e5257 100755 --- a/tests/_utils/run_dm_worker +++ b/tests/_utils/run_dm_worker @@ -13,11 +13,11 @@ mkdir -p $workdir/relay_log $workdir/dumped_data $workdir/log $workdir/bin PWD=$(pwd) binary=$PWD/bin/dm-worker.test -ln -s $PWD/bin/mydumper $workdir/bin/mydumper +[ -f $workdir/bin/mydumper ] || ln -s $PWD/bin/mydumper $workdir/bin/mydumper echo "[$(date)] <<<<<< START DM-WORKER on port $port, config: $conf >>>>>>" cd $workdir -$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.out" DEVEL \ +$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.worker.$port.$(date +"%s").out" DEVEL \ --worker-addr=:$port --relay-dir="$workdir/relay_log" \ --log-file="$workdir/log/dm-worker.log" -L=debug --config="$conf" \ > $workdir/log/stdout.log 2>&1 & diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 1d5f85dbe0..1488d38f2c 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -27,6 +27,13 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + pkill -hup dm-worker.test 2>/dev/null || true + wait_process_exit dm-worker.test + + # restart dm-worker + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2