Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

restore task after dm-workers restart #88

Merged
merged 24 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.From.User,
Password: instance.cfg.From.Password,
}

if len(instance.sourceDBinfo.Password) > 0 {
pswd, err2 := utils.Decrypt(instance.sourceDBinfo.Password)
if err2 != nil {
return errors.Annotatef(err2, "can not decrypt password %s", instance.sourceDBinfo.Password)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}
instance.sourceDBinfo.Password = pswd
}
instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo)
if err != nil {
return errors.Trace(err)
Expand Down
48 changes: 47 additions & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (c *SubTaskConfig) adjust() error {
}
}

return nil
_, err := c.DecryptPassword()
return err
}

// Parse parses flag definitions from the argument list.
Expand Down Expand Up @@ -289,3 +290,48 @@ func (c *SubTaskConfig) Parse(arguments []string) error {

return errors.Trace(c.adjust())
}

// DecryptPassword tries to decrypt db password in config
func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) {
clone, err := c.Clone()
if err != nil {
return nil, errors.Trace(err)
}

var (
pswdTo string
pswdFrom string
)
if len(clone.To.Password) > 0 {
pswdTo, err = utils.Decrypt(clone.To.Password)
if err != nil {
return nil, errors.Annotatef(err, "can not decrypt password %s of downstream DB", clone.To.Password)
}
}
if len(clone.From.Password) > 0 {
pswdFrom, err = utils.Decrypt(clone.From.Password)
if err != nil {
return nil, errors.Annotatef(err, "can not decrypt password %s of source DB", clone.From.Password)
}
}
clone.From.Password = pswdFrom
clone.To.Password = pswdTo

return clone, nil
}

// Clone returns a replica of SubTaskConfig
func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) {
content, err := c.Toml()
if err != nil {
return nil, errors.Trace(err)
}

clone := &SubTaskConfig{}
_, err = toml.Decode(content, clone)
if err != nil {
return nil, errors.Trace(err)
}

return clone, nil
}
52 changes: 52 additions & 0 deletions dm/config/subtask_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
. "github.com/pingcap/check"
)

func (t *testConfig) TestSubTask(c *C) {
cfg := &SubTaskConfig{
From: DBConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "Up8156jArvIPymkVC+5LxkAT6rek",
},
To: DBConfig{
Host: "127.0.0.1",
Port: 4306,
User: "root",
Password: "",
},
}

clone1, err := cfg.Clone()
c.Assert(err, IsNil)
c.Assert(cfg, DeepEquals, clone1)

clone1.From.Password = "1234"
clone2, err := cfg.DecryptPassword()
c.Assert(err, IsNil)
c.Assert(clone2, DeepEquals, clone1)

cfg.From.Password = "xxx"
clone3, err := cfg.DecryptPassword()
c.Assert(err, NotNil)

cfg.From.Password = ""
clone3, err = cfg.DecryptPassword()
c.Assert(clone3, DeepEquals, cfg)
}
78 changes: 38 additions & 40 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Config struct {
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
ServerID int `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`
Expand Down Expand Up @@ -118,29 +119,12 @@ func (c *Config) String() string {
// Toml returns TOML format representation of config
func (c *Config) Toml() (string, error) {
var b bytes.Buffer
var pswd string
var err error

enc := toml.NewEncoder(&b)
if len(c.From.Password) > 0 {
pswd, err = utils.Encrypt(c.From.Password)
if err != nil {
return "", errors.Annotatef(err, "can not encrypt password %s", c.From.Password)
}
}
c.From.Password = pswd

err = enc.Encode(c)
err := toml.NewEncoder(&b).Encode(c)
if err != nil {
log.Errorf("[worker] marshal config to toml error %v", err)
}
if len(c.From.Password) > 0 {
pswd, err = utils.Decrypt(c.From.Password)
if err != nil {
return "", errors.Annotatef(err, "can not decrypt password %s", c.From.Password)
}
}
c.From.Password = pswd

return string(b.String()), nil
}

Expand Down Expand Up @@ -189,15 +173,9 @@ func (c *Config) Parse(arguments []string) error {
return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0))
}

// try decrypt password
var pswd string
if len(c.From.Password) > 0 {
pswd, err = utils.Decrypt(c.From.Password)
if err != nil {
return errors.Annotatef(err, "can not decrypt password %s", c.From.Password)
}
if len(c.MetaDir) == 0 {
c.MetaDir = "./dm_worker_meta"
}
c.From.Password = pswd

// assign tracer id to source id
c.Tracer.Source = c.SourceID
Expand All @@ -211,25 +189,36 @@ func (c *Config) verify() error {
return errors.Errorf("dm-worker should bind a non-empty source ID which represents a MySQL/MariaDB instance or a replica group. \n notice: if you use old version dm-ansible, please update to newest version.")
}

var err error
if len(c.RelayBinLogName) > 0 {
_, err := streamer.GetBinlogFileIndex(c.RelayBinLogName)
_, err = streamer.GetBinlogFileIndex(c.RelayBinLogName)
if err != nil {
return errors.Annotatef(err, "relay-binlog-name %s", c.RelayBinLogName)
}
}
if len(c.RelayBinlogGTID) > 0 {
_, err := gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
_, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
if err != nil {
return errors.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID)
}
}
return nil

_, err = c.DecryptPassword()
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better return errors.Trace(err)

}

// configFromFile loads config from file.
func (c *Config) configFromFile(path string) error {
_, err := toml.DecodeFile(path, c)
return errors.Trace(err)
if err != nil {
return errors.Trace(err)
}

err = c.verify()
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
return nil
}

// UpdateConfigFile write configure to local file
Expand All @@ -246,25 +235,34 @@ func (c *Config) UpdateConfigFile(content string) error {

// Reload reload configure from ConfigFile
func (c *Config) Reload() error {
var pswd string
var err error

if c.ConfigFile == "" {
c.ConfigFile = "dm-worker-config.bak"
}

err = c.configFromFile(c.ConfigFile)
err := c.configFromFile(c.ConfigFile)
if err != nil {
return errors.Trace(err)
}

if len(c.From.Password) > 0 {
pswd, err = utils.Decrypt(c.From.Password)
return nil
}

// DecryptPassword return a decrypted config replica in config
IANTHEREAL marked this conversation as resolved.
Show resolved Hide resolved
func (c *Config) DecryptPassword() (*Config, error) {
// try decrypt password for To DB
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

clone := c.Clone()
var (
pswdFrom string
err error
)
if len(clone.From.Password) > 0 {
pswdFrom, err = utils.Decrypt(clone.From.Password)
if err != nil {
return errors.Annotatef(err, "can not decrypt password %s", c.From.Password)
return nil, errors.Annotatef(err, "can not decrypt password %s", clone.From.Password)
}
}
c.From.Password = pswd
clone.From.Password = pswdFrom

return nil
return clone, nil
}
42 changes: 42 additions & 0 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
. "github.com/pingcap/check"
)

func (t *testWorker) TestConfig(c *C) {
cfg := &Config{}

err := cfg.configFromFile("./dm-worker.toml")
c.Assert(err, IsNil)
c.Assert(cfg.SourceID, Equals, "mysql-replica-01")

clone1 := cfg.Clone()
c.Assert(cfg, DeepEquals, clone1)

clone1.From.Password = "1234"
clone2, err := cfg.DecryptPassword()
c.Assert(err, IsNil)
c.Assert(clone2, DeepEquals, clone1)

cfg.From.Password = "xxx"
clone3, err := cfg.DecryptPassword()
c.Assert(err, NotNil)

cfg.From.Password = ""
clone3, err = cfg.DecryptPassword()
c.Assert(clone3, DeepEquals, cfg)
}
2 changes: 1 addition & 1 deletion dm/worker/dm-worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ enable-gtid = false
[from]
host = "127.0.0.1"
user = "root"
password = ""
password = "Up8156jArvIPymkVC+5LxkAT6rek"
port = 3306

#relay log purge strategy
Expand Down
Loading