Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

restore task after dm-workers restart #88

Merged
merged 24 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *
}

for _, cfg := range cfgs {
// we have verify it in subtask config
replica, _ := cfg.DecryptPassword()
c.instances = append(c.instances, &mysqlInstance{
cfg: cfg,
cfg: replica,
})
}

Expand Down Expand Up @@ -118,7 +120,6 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.From.User,
Password: instance.cfg.From.Password,
}

instance.sourceDB, err = dbutil.OpenDB(*instance.sourceDBinfo)
if err != nil {
return errors.Trace(err)
Expand All @@ -130,13 +131,6 @@ func (c *Checker) Init() (err error) {
User: instance.cfg.To.User,
Password: instance.cfg.To.Password,
}
if len(instance.targetDBInfo.Password) > 0 {
pswd, err2 := utils.Decrypt(instance.targetDBInfo.Password)
if err2 != nil {
return errors.Annotatef(err2, "can not decrypt password %s", instance.targetDBInfo.Password)
}
instance.targetDBInfo.Password = pswd
}
instance.targetDB, err = dbutil.OpenDB(*instance.targetDBInfo)
if err != nil {
return errors.Trace(err)
Expand Down
48 changes: 47 additions & 1 deletion dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (c *SubTaskConfig) adjust() error {
}
}

return nil
_, err := c.DecryptPassword()
return err
}

// Parse parses flag definitions from the argument list.
Expand Down Expand Up @@ -289,3 +290,48 @@ func (c *SubTaskConfig) Parse(arguments []string) error {

return errors.Trace(c.adjust())
}

// DecryptPassword tries to decrypt db password in config
func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) {
clone, err := c.Clone()
if err != nil {
return nil, errors.Trace(err)
}

var (
pswdTo string
pswdFrom string
)
if len(clone.To.Password) > 0 {
pswdTo, err = utils.Decrypt(clone.To.Password)
if err != nil {
return nil, errors.Annotatef(err, "downstream DB")
}
}
if len(clone.From.Password) > 0 {
pswdFrom, err = utils.Decrypt(clone.From.Password)
if err != nil {
return nil, errors.Annotatef(err, "source DB")
}
}
clone.From.Password = pswdFrom
clone.To.Password = pswdTo

return clone, nil
}

// Clone returns a replica of SubTaskConfig
func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) {
content, err := c.Toml()
if err != nil {
return nil, errors.Trace(err)
}

clone := &SubTaskConfig{}
_, err = toml.Decode(content, clone)
if err != nil {
return nil, errors.Trace(err)
}

return clone, nil
}
52 changes: 52 additions & 0 deletions dm/config/subtask_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
. "github.com/pingcap/check"
)

func (t *testConfig) TestSubTask(c *C) {
cfg := &SubTaskConfig{
From: DBConfig{
Host: "127.0.0.1",
Port: 3306,
User: "root",
Password: "Up8156jArvIPymkVC+5LxkAT6rek",
},
To: DBConfig{
Host: "127.0.0.1",
Port: 4306,
User: "root",
Password: "",
},
}

clone1, err := cfg.Clone()
c.Assert(err, IsNil)
c.Assert(cfg, DeepEquals, clone1)

clone1.From.Password = "1234"
clone2, err := cfg.DecryptPassword()
c.Assert(err, IsNil)
c.Assert(clone2, DeepEquals, clone1)

cfg.From.Password = "xxx"
clone3, err := cfg.DecryptPassword()
c.Assert(err, NotNil)

cfg.From.Password = ""
clone3, err = cfg.DecryptPassword()
c.Assert(clone3, DeepEquals, cfg)
}
2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C
if err != nil {
return &pb.CheckTaskResponse{
Result: false,
Msg: errors.Cause(err).Error(),
Msg: errors.ErrorStack(err),
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dm/tracer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tracer

import (
"context"
"net"
"net/http"
"sync"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
"github.com/soheilhy/cmux"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/pingcap/dm/dm/common"
Expand Down
78 changes: 39 additions & 39 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Config struct {
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
MetaDir string `toml:"meta-dir" json:"meta-dir"`
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
ServerID int `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`
Expand Down Expand Up @@ -118,29 +119,12 @@ func (c *Config) String() string {
// Toml returns TOML format representation of config
func (c *Config) Toml() (string, error) {
var b bytes.Buffer
var pswd string
var err error

enc := toml.NewEncoder(&b)
if len(c.From.Password) > 0 {
pswd, err = utils.Encrypt(c.From.Password)
if err != nil {
return "", errors.Annotatef(err, "can not encrypt password %s", c.From.Password)
}
}
c.From.Password = pswd

err = enc.Encode(c)
err := toml.NewEncoder(&b).Encode(c)
if err != nil {
log.Errorf("[worker] marshal config to toml error %v", err)
}
if len(c.From.Password) > 0 {
pswd, err = utils.Decrypt(c.From.Password)
if err != nil {
return "", errors.Annotatef(err, "can not decrypt password %s", c.From.Password)
}
}
c.From.Password = pswd

return string(b.String()), nil
}

Expand Down Expand Up @@ -189,15 +173,9 @@ func (c *Config) Parse(arguments []string) error {
return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0))
}

// try decrypt password
var pswd string
if len(c.From.Password) > 0 {
pswd, err = utils.Decrypt(c.From.Password)
if err != nil {
return errors.Annotatef(err, "can not decrypt password %s", c.From.Password)
}
if len(c.MetaDir) == 0 {
c.MetaDir = "./dm_worker_meta"
}
c.From.Password = pswd

// assign tracer id to source id
c.Tracer.Source = c.SourceID
Expand All @@ -211,25 +189,40 @@ func (c *Config) verify() error {
return errors.Errorf("dm-worker should bind a non-empty source ID which represents a MySQL/MariaDB instance or a replica group. \n notice: if you use old version dm-ansible, please update to newest version.")
}

var err error
if len(c.RelayBinLogName) > 0 {
_, err := streamer.GetBinlogFileIndex(c.RelayBinLogName)
_, err = streamer.GetBinlogFileIndex(c.RelayBinLogName)
if err != nil {
return errors.Annotatef(err, "relay-binlog-name %s", c.RelayBinLogName)
}
}
if len(c.RelayBinlogGTID) > 0 {
_, err := gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
_, err = gtid.ParserGTID(c.Flavor, c.RelayBinlogGTID)
if err != nil {
return errors.Annotatef(err, "relay-binlog-gtid %s", c.RelayBinlogGTID)
}
}

_, err = c.DecryptPassword()
if err != nil {
return errors.Trace(err)
}

return nil
}

// configFromFile loads config from file.
func (c *Config) configFromFile(path string) error {
_, err := toml.DecodeFile(path, c)
return errors.Trace(err)
if err != nil {
return errors.Trace(err)
}

err = c.verify()
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
return nil
}

// UpdateConfigFile write configure to local file
Expand All @@ -246,25 +239,32 @@ func (c *Config) UpdateConfigFile(content string) error {

// Reload reload configure from ConfigFile
func (c *Config) Reload() error {
var pswd string
var err error

if c.ConfigFile == "" {
c.ConfigFile = "dm-worker-config.bak"
}

err = c.configFromFile(c.ConfigFile)
err := c.configFromFile(c.ConfigFile)
if err != nil {
return errors.Trace(err)
}

if len(c.From.Password) > 0 {
pswd, err = utils.Decrypt(c.From.Password)
return nil
}

// DecryptPassword returns a decrypted config replica in config
func (c *Config) DecryptPassword() (*Config, error) {
clone := c.Clone()
var (
pswdFrom string
err error
)
if len(clone.From.Password) > 0 {
pswdFrom, err = utils.Decrypt(clone.From.Password)
if err != nil {
return errors.Annotatef(err, "can not decrypt password %s", c.From.Password)
return nil, errors.Trace(err)
}
}
c.From.Password = pswd
clone.From.Password = pswdFrom

return nil
return clone, nil
}
42 changes: 42 additions & 0 deletions dm/worker/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
. "github.com/pingcap/check"
)

func (t *testWorker) TestConfig(c *C) {
cfg := &Config{}

err := cfg.configFromFile("./dm-worker.toml")
c.Assert(err, IsNil)
c.Assert(cfg.SourceID, Equals, "mysql-replica-01")

clone1 := cfg.Clone()
c.Assert(cfg, DeepEquals, clone1)

clone1.From.Password = "1234"
clone2, err := cfg.DecryptPassword()
c.Assert(err, IsNil)
c.Assert(clone2, DeepEquals, clone1)

cfg.From.Password = "xxx"
clone3, err := cfg.DecryptPassword()
c.Assert(err, NotNil)

cfg.From.Password = ""
clone3, err = cfg.DecryptPassword()
c.Assert(clone3, DeepEquals, cfg)
}
2 changes: 1 addition & 1 deletion dm/worker/dm-worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ enable-gtid = false
[from]
host = "127.0.0.1"
user = "root"
password = ""
password = "Up8156jArvIPymkVC+5LxkAT6rek"
port = 3306

#relay log purge strategy
Expand Down
Loading