Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

ha: refactor the schedule model #473

Merged
merged 125 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
48408ae
ha: add some design for the new HA model
csuzhangxc Feb 10, 2020
aa5b69f
Update pkg/ha/doc.go
csuzhangxc Feb 10, 2020
ca3191d
ha: add some design for the new HA model
csuzhangxc Feb 10, 2020
e6643f0
ha: add etcd operation sample for source
csuzhangxc Feb 11, 2020
c9000f2
ha: add etcd operation sample for source
csuzhangxc Feb 11, 2020
80964fa
add subtask
lichunzhu Feb 11, 2020
98a225e
address comments
lichunzhu Feb 11, 2020
4308f8f
ha: add etcd operation sample for dm-worker info
csuzhangxc Feb 11, 2020
4232b88
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 11, 2020
a1b76b3
ha: update copyright year
csuzhangxc Feb 11, 2020
5953a80
ha: update wording
csuzhangxc Feb 11, 2020
b69de45
return map for subtask key
lichunzhu Feb 11, 2020
1dcff3b
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 11, 2020
1ec15dc
fix bug
lichunzhu Feb 11, 2020
3119076
ha: add etcd operation for source bound
csuzhangxc Feb 11, 2020
c4c81d6
Merge remote-tracking branch 'remotes/origin/ha-dev' into ha-refactor
csuzhangxc Feb 11, 2020
7c97eed
add keepalive
lichunzhu Feb 11, 2020
580b3b6
refine code
lichunzhu Feb 11, 2020
5a0a934
fix hound
lichunzhu Feb 12, 2020
caa378a
fix make check
lichunzhu Feb 12, 2020
e0c07a2
ha: add etcd operation for stage
csuzhangxc Feb 12, 2020
91b1d01
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 12, 2020
6ec665d
ha: report error for watcher through chan
csuzhangxc Feb 12, 2020
af3687c
add errCh, support multi put
lichunzhu Feb 12, 2020
ef8a5d7
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 12, 2020
d012095
ha: add etcd operation for operations in one txn
csuzhangxc Feb 12, 2020
0a234be
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 12, 2020
55cd37c
ha: refine code
csuzhangxc Feb 12, 2020
7c2fc72
refine code
lichunzhu Feb 12, 2020
0a5893e
support getting alive workers
lichunzhu Feb 12, 2020
428a94e
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 12, 2020
eae1b1c
refine code
lichunzhu Feb 12, 2020
ed12d30
address comments
lichunzhu Feb 13, 2020
6ef1a7f
refine wait time
lichunzhu Feb 13, 2020
44b8d54
add revision support
lichunzhu Feb 13, 2020
d903258
add ut for rev
lichunzhu Feb 13, 2020
d27aaf7
support revision in stage
lichunzhu Feb 13, 2020
4727c07
refine stage revision test
lichunzhu Feb 13, 2020
d372f9f
support etcd operation for subtask and relay
lichunzhu Feb 13, 2020
cc47624
add revision for bound and source
lichunzhu Feb 14, 2020
6e91443
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 14, 2020
b77d34e
ha: get all bound relationship
csuzhangxc Feb 14, 2020
bb6be0c
refine revision for event
lichunzhu Feb 14, 2020
cb0858f
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 14, 2020
278046f
refine comments
lichunzhu Feb 14, 2020
4436cec
support etcd operations on dm-worker
lichunzhu Feb 14, 2020
9ac916c
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 14, 2020
2ecdc56
fix check
lichunzhu Feb 14, 2020
535d059
scheduler: add worker agent; add scheduler skeleton
csuzhangxc Feb 14, 2020
b874b79
scheduler: handle source config; refine code
csuzhangxc Feb 14, 2020
295d020
scheduler: record bounds and unbounds; add test steps
csuzhangxc Feb 14, 2020
277f130
ha: add delete API for source bound
csuzhangxc Feb 15, 2020
ce49d96
scheduler: bound/unbound when the worker become online/offline
csuzhangxc Feb 15, 2020
cb4f16f
scheduler: refine tests
csuzhangxc Feb 15, 2020
935eccf
scheduler: put relay stage when put the first bound; add some code fo…
csuzhangxc Feb 15, 2020
eb0925f
scheduler: support update relay stage.
csuzhangxc Feb 15, 2020
e324398
scheduler: support add subtasks
csuzhangxc Feb 15, 2020
d9bea9d
ha: add get all for source config and relay stage.
csuzhangxc Feb 15, 2020
4198b1f
ha: add get all for subtask config
csuzhangxc Feb 15, 2020
5b744a6
scheduler: support update subtask; recover config and stage for sourc…
csuzhangxc Feb 15, 2020
e6f6a3e
refine worker UT
lichunzhu Feb 16, 2020
30d33e4
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 16, 2020
73cee97
ha: address comments
csuzhangxc Feb 16, 2020
b65ece5
move decrypt config to init
lichunzhu Feb 16, 2020
f8d7c75
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 16, 2020
964aeb6
*: refine bound source to worker
csuzhangxc Feb 16, 2020
2f7bff7
*: support remove subtask; update remove source
csuzhangxc Feb 16, 2020
4207d9e
scheduler: fix unbounds when removing source
csuzhangxc Feb 16, 2020
7711e6f
scheduler: add SendRequest API for worker agent
csuzhangxc Feb 16, 2020
19e329b
scheduler: add more comments and test cases
csuzhangxc Feb 16, 2020
bdbe56f
scheduler: address comment to fix deadlock
csuzhangxc Feb 17, 2020
713640e
remove coordinator, switch to scheduler api
lichunzhu Feb 17, 2020
5d07e1c
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 17, 2020
c2a7651
refine clear env method
lichunzhu Feb 17, 2020
095f7a4
clear etcd info
lichunzhu Feb 17, 2020
0a9d45c
refine UT
lichunzhu Feb 17, 2020
7f841c1
refine master UT
lichunzhu Feb 17, 2020
8a2c88c
extract ClearTestInfoOperation
lichunzhu Feb 17, 2020
4a6dcb4
set mysqlConfig password to empty
lichunzhu Feb 17, 2020
595a11a
fix
lichunzhu Feb 17, 2020
0794bb0
*: remove the code about Coordinator
csuzhangxc Feb 17, 2020
ab12458
operate source before keepalive, refine logs and add purgeRelayDir
lichunzhu Feb 17, 2020
b7a863a
Merge branch 'ha-refactor' of https://github.com/lichunzhu/dm into ha…
lichunzhu Feb 17, 2020
88afe72
*: rename `MySQLConfig` to `SourceConfig`, `operate-worker` to `opera…
csuzhangxc Feb 17, 2020
7c7be03
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 17, 2020
1795377
worker: fix merge
csuzhangxc Feb 17, 2020
af535f3
address comment
lichunzhu Feb 17, 2020
06b259b
switch to mock
lichunzhu Feb 17, 2020
fe8d7a3
*: encode check's config to toml; log the error about start work fail
csuzhangxc Feb 17, 2020
6128026
make worker not relying on downstream tidb
lichunzhu Feb 17, 2020
06a0332
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 17, 2020
53890fd
*: rename `operate-worker` to `operate-source`, `dm-mysql.toml` to `s…
csuzhangxc Feb 17, 2020
ac2da20
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 17, 2020
98da039
fix parse problem
lichunzhu Feb 17, 2020
b445e7c
fix parse problem again
lichunzhu Feb 18, 2020
0f5729f
scheduler: support add the same worker multiple times
csuzhangxc Feb 18, 2020
79f0991
add comments and UT for source configss
lichunzhu Feb 18, 2020
d21c2b7
Merge branch 'ha-refactor' of https://github.com/lichunzhu/dm into ha…
lichunzhu Feb 18, 2020
f41ef63
fix UT error
lichunzhu Feb 18, 2020
2783ba6
address comments
lichunzhu Feb 18, 2020
91bc029
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 18, 2020
6e0c686
fix all_mode integration test, remove disable heartbeat
lichunzhu Feb 18, 2020
e5302d4
comment all enable-heartbeat in integration tests
lichunzhu Feb 18, 2020
d543271
refine dmctl_basic UT
lichunzhu Feb 18, 2020
badb6af
refine dmctl_basic UT part2
lichunzhu Feb 18, 2020
d8e34f5
refine dmctl_basic integration tests part.3
lichunzhu Feb 18, 2020
ead0984
tests: turn off relay_interrupt until compatible with relay again.
csuzhangxc Feb 18, 2020
c7e2296
tests: fix print_status
csuzhangxc Feb 18, 2020
230322c
tests: try to fix ha
csuzhangxc Feb 18, 2020
7899b28
tests: fix initial_unit
csuzhangxc Feb 18, 2020
39e9880
tests: try to fix ha
csuzhangxc Feb 18, 2020
633e6b6
refine dmctl_basic
lichunzhu Feb 18, 2020
10ac6e9
small fi
lichunzhu Feb 18, 2020
2f06e0f
tests: fix incremental_mode; update test_prepare
csuzhangxc Feb 18, 2020
9a05569
fix http_apis test
lichunzhu Feb 18, 2020
8a21f33
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 18, 2020
894c8f6
sleep after operation in http_apis
lichunzhu Feb 18, 2020
1033e72
disable relay in ha integration tests
lichunzhu Feb 18, 2020
d437200
tests: rename schema in dm_syncer; skip online DDL
csuzhangxc Feb 18, 2020
c7e3abe
refine start_task test
lichunzhu Feb 18, 2020
5b384b3
tests: revert online_ddl case; remove retry_cancel
csuzhangxc Feb 18, 2020
72ade95
Merge remote-tracking branch 'remotes/origin/ha-dev' into ha-refactor
csuzhangxc Feb 18, 2020
15aa987
*: fix merge
csuzhangxc Feb 18, 2020
2d4b386
*: fix merge
csuzhangxc Feb 18, 2020
355c674
tests: abort online_ddl
csuzhangxc Feb 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,9 @@ ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"fail to start embed etcd"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fail to parse URL %s"
ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s"
ErrMasterCoordinatorNotStart,[code=38041:class=dm-master:scope=internal:level=high],"coordinator does not start"
ErrMasterAcquireWorkerFailed,[code=38042:class=dm-master:scope=internal:level=medium],"acquire worker failed: %s"
ErrMasterAdvertiseAddrNotValid,[code=38043:class=dm-master:scope=internal:level=high],"advertise address %s not valid"
ErrMasterRequestIsNotForwardToLeader,[code=38044:class=dm-master:scope=internal:level=high],"master is not leader, and can't forward request to leader"
ErrMasterInvalidOperateTaskOp,[code=38041:class=dm-master:scope=internal:level=medium],"invalid op %s on task"
ErrMasterAdvertiseAddrNotValid,[code=38042:class=dm-master:scope=internal:level=high],"advertise address %s not valid"
ErrMasterRequestIsNotForwardToLeader,[code=38043:class=dm-master:scope=internal:level=high],"master is not leader, and can't forward request to leader"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down Expand Up @@ -422,3 +421,20 @@ ErrSchemaTrackerCannotGetTable,[code=44005:class=schema-tracker:scope=internal:l
ErrSchemaTrackerCannotExecDDL,[code=44006:class=schema-tracker:scope=internal:level=high],"cannot track DDL: %s"
ErrSchemaTrackerCannotFetchDownstreamTable,[code=44007:class=schema-tracker:scope=downstream:level=medium],"cannot fetch downstream table schema of `%s`.`%s` to initialize upstream schema `%s`.`%s` in schema tracker"
ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scope=internal:level=high],"cannot parse downstream table schema of `%s`.`%s` to initialize upstream schema `%s`.`%s` in schema tracker"
ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high],"the scheduler has not started"
ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium],"the scheduler has already started"
ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium],"dm-worker with name %s already exists"
ErrSchedulerWorkerNotExist,[code=46004:class=scheduler:scope=internal:level=medium],"dm-worker with name %s not exists"
ErrSchedulerWorkerOnline,[code=46005:class=scheduler:scope=internal:level=medium],"dm-worker with name %s is still online, must shut it down first"
ErrSchedulerWorkerInvalidTrans,[code=46006:class=scheduler:scope=internal:level=medium],"invalid stage transformation for dm-worker %s, from %s to %s"
ErrSchedulerSourceCfgExist,[code=46007:class=scheduler:scope=internal:level=medium],"source config with ID %s already exists"
ErrSchedulerSourceCfgNotExist,[code=46008:class=scheduler:scope=internal:level=medium],"source config with ID %s not exists"
ErrSchedulerSourcesUnbound,[code=46009:class=dm-master:scope=internal:level=medium],"sources %v have not bound"
ErrSchedulerSourceOpTaskExist,[code=46010:class=dm-master:scope=internal:level=medium],"source with name % need to operate with tasks %v exist"
ErrSchedulerRelayStageInvalidUpdate,[code=46011:class=scheduler:scope=internal:level=medium],"invalid new expectant relay stage %s"
ErrSchedulerRelayStageSourceNotExist,[code=46012:class=scheduler:scope=internal:level=medium],"sources %v need to update expectant relay stage not exist"
ErrSchedulerMultiTask,[code=46013:class=scheduler:scope=internal:level=medium],"the scheduler cannot perform multiple different tasks %v in one operation"
ErrSchedulerSubTaskExist,[code=46014:class=scheduler:scope=internal:level=medium],"subtasks with name %s for sources %v already exist"
ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal:level=medium],"invalid new expectant subtask stage %s"
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium],"subtasks with name %s need to be operate not exist"
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium],"sources %v need to be operate not exist"
9 changes: 4 additions & 5 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,10 @@ func (oc *oldConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
return nil, errors.Trace(err)
}
newTask := &config.SubTaskConfig{
Name: "dm-syncer",
SourceID: "dm-syncer-from-old-config",
DisableHeartbeat: true,
Mode: config.ModeIncrement,
Meta: meta,
Name: "dm-syncer",
SourceID: "dm-syncer-from-old-config",
Mode: config.ModeIncrement,
Meta: meta,

LogLevel: oc.LogLevel,
LogFile: oc.LogFile,
Expand Down
17 changes: 12 additions & 5 deletions dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ import (
var (
useOfClosedErrMsg = "use of closed network connection"
// WorkerRegisterKeyAdapter used to encode and decode register key.
// k/v: Encode(addr) -> name
// k/v: Encode(name) -> the information of the DM-worker node.
WorkerRegisterKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/r/")
// WorkerKeepAliveKeyAdapter used to encode and decode keepalive key.
// k/v: Encode(addr,name) -> time
// k/v: Encode(name) -> time
WorkerKeepAliveKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/a/")
// UpstreamConfigKeyAdapter store all config of which MySQL-task has not stopped.
// k/v: Encode(source-id) -> config
UpstreamConfigKeyAdapter KeyAdapter = keyEncoderDecoder("/dm-master/upstream/config/")
// UpstreamBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// k/v: Encode(addr) -> source-id
// k/v: Encode(name) -> the bound relationship.
UpstreamBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/bound-worker/")
// UpstreamSubTaskKeyAdapter used to store SubTask which are subscribing data from MySQL source.
// k/v: Encode(source-id, task-name) -> SubTaskConfig
UpstreamSubTaskKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/upstream/subtask/")
// StageRelayKeyAdapter used to store the running stage of the relay.
// k/v: Encode(source-id) -> the running stage of the relay.
StageRelayKeyAdapter KeyAdapter = keyEncoderDecoder("/dm-master/stage/relay/")
// StageSubTaskKeyAdapter used to store the running stage of the subtask.
// k/v: Encode(source-id, task-name) -> the running stage of the subtask.
StageSubTaskKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/stage/subtask/")

// ShardDDLPessimismInfoKeyAdapter used to store shard DDL info in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL info
Expand All @@ -50,9 +56,10 @@ var (

func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter:
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter:
return 1
case WorkerKeepAliveKeyAdapter, UpstreamSubTaskKeyAdapter:
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter:
return 2
}
return -1
Expand Down
4 changes: 2 additions & 2 deletions dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (t *testCommon) TestKeyAdapter(c *C) {
want: "/dm-worker/r/3132372e302e302e313a32333832",
},
{
keys: []string{"127.0.0.1:2382", "worker1"},
keys: []string{"worker1"},
adapter: WorkerKeepAliveKeyAdapter,
want: "/dm-worker/a/3132372e302e302e313a32333832/776f726b657231",
want: "/dm-worker/a/776f726b657231",
},
{
keys: []string{"mysql1"},
Expand Down
12 changes: 7 additions & 5 deletions dm/config/checker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type Duration struct {
}

// MarshalText hacks to satisfy the encoding.TextMarshaler interface
// For MarshalText, we should use (d Duration) which can be used by both pointer and instance
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d.Duration.String()), nil
}

// UnmarshalText hacks to satisfy the encoding.TextUnmarshaler interface
func (d Duration) UnmarshalText(text []byte) error {
// For UnmarshalText, we should use (d *Duration) to change the value of this instance instead of the copy
func (d *Duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
Expand All @@ -47,10 +49,10 @@ type CheckerConfig struct {
BackoffRollback Duration `toml:"backoff-rollback" json:"backoff-rollback"`
BackoffMax Duration `toml:"backoff-max" json:"backoff-max"`
// unexpose config
CheckInterval Duration `json:"-"`
BackoffMin Duration `json:"-"`
BackoffJitter bool `json:"-"`
BackoffFactor float64 `json:"-"`
CheckInterval Duration `toml:"check-interval" json:"-"`
BackoffMin Duration `toml:"backoff-min" json:"-"`
BackoffJitter bool `toml:"backoff-jitter" json:"-"`
BackoffFactor float64 `toml:"backoff-factor" json:"-"`
}

// Adjust sets default value for field: CheckInterval/BackoffMin/BackoffJitter/BackoffFactor
Expand Down
45 changes: 23 additions & 22 deletions dm/config/mysql_config.go → dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/pkg/utils"
"github.com/siddontang/go-mysql/mysql"
)

const (
Expand All @@ -39,8 +40,8 @@ type PurgeConfig struct {
RemainSpace int64 `toml:"remain-space" json:"remain-space"` // if remain space in @RelayBaseDir less than @RemainSpace (GB), then it can be purged
}

// MysqlConfig is the configuration for Worker
type MysqlConfig struct {
// SourceConfig is the configuration for Worker
type SourceConfig struct {
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
Expand Down Expand Up @@ -69,9 +70,9 @@ type MysqlConfig struct {
ServerID uint32 `toml:"server-id" json:"server-id"`
}

// NewMysqlConfig creates a new base config for worker.
func NewMysqlConfig() *MysqlConfig {
c := &MysqlConfig{
// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := &SourceConfig{
RelayDir: "relay-dir",
Purge: PurgeConfig{
Interval: 60 * 60,
Expand All @@ -95,14 +96,14 @@ func NewMysqlConfig() *MysqlConfig {
}

// Clone clones a config
func (c *MysqlConfig) Clone() *MysqlConfig {
clone := &MysqlConfig{}
func (c *SourceConfig) Clone() *SourceConfig {
clone := &SourceConfig{}
*clone = *c
return clone
}

// Toml returns TOML format representation of config
func (c *MysqlConfig) Toml() (string, error) {
func (c *SourceConfig) Toml() (string, error) {
var b bytes.Buffer

err := toml.NewEncoder(&b).Encode(c)
Expand All @@ -114,36 +115,36 @@ func (c *MysqlConfig) Toml() (string, error) {
}

// Parse parses flag definitions from the argument list.
func (c *MysqlConfig) Parse(content string) error {
func (c *SourceConfig) Parse(content string) error {
// Parse first to get config file.
metaData, err := toml.Decode(content, c)
return c.check(&metaData, err)
}

// EncodeToml encodes config.
func (c *MysqlConfig) EncodeToml() (string, error) {
func (c *SourceConfig) EncodeToml() (string, error) {
buf := new(bytes.Buffer)
if err := toml.NewEncoder(buf).Encode(c); err != nil {
return "", err
}
return buf.String(), nil
}

func (c *MysqlConfig) String() string {
func (c *SourceConfig) String() string {
cfg, err := json.Marshal(c)
if err != nil {
log.L().Error("fail to marshal config to json", log.ShortError(err))
}
return string(cfg)
}

func (c *MysqlConfig) adjust() {
func (c *SourceConfig) adjust() {
c.From.Adjust()
c.Checker.Adjust()
}

// Verify verifies the config
func (c *MysqlConfig) Verify() error {
func (c *SourceConfig) Verify() error {
if len(c.SourceID) == 0 {
return terror.ErrWorkerNeedSourceID.Generate()
}
Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *MysqlConfig) Verify() error {
}

// DecryptPassword returns a decrypted config replica in config
func (c *MysqlConfig) DecryptPassword() (*MysqlConfig, error) {
func (c *SourceConfig) DecryptPassword() (*SourceConfig, error) {
clone := c.Clone()
var (
pswdFrom string
Expand All @@ -192,7 +193,7 @@ func (c *MysqlConfig) DecryptPassword() (*MysqlConfig, error) {
}

// GenerateDBConfig creates DBConfig for DB
func (c *MysqlConfig) GenerateDBConfig() (*DBConfig, error) {
func (c *SourceConfig) GenerateDBConfig() (*DBConfig, error) {
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
Expand All @@ -203,8 +204,8 @@ func (c *MysqlConfig) GenerateDBConfig() (*DBConfig, error) {
return from, nil
}

// Adjust flavor and serverid of MysqlConfig
func (c *MysqlConfig) Adjust(db *sql.DB) (err error) {
// Adjust flavor and serverid of SourceConfig
func (c *SourceConfig) Adjust(db *sql.DB) (err error) {
c.From.Adjust()
c.Checker.Adjust()

Expand All @@ -227,7 +228,7 @@ func (c *MysqlConfig) Adjust(db *sql.DB) (err error) {
}

// AdjustFlavor adjust Flavor from DB
func (c *MysqlConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error) {
func (c *SourceConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
Expand All @@ -245,7 +246,7 @@ func (c *MysqlConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error)
}

// AdjustServerID adjust server id from DB
func (c *MysqlConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
return nil
}
Expand Down Expand Up @@ -273,12 +274,12 @@ func (c *MysqlConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
}

// LoadFromFile loads config from file.
func (c *MysqlConfig) LoadFromFile(path string) error {
func (c *SourceConfig) LoadFromFile(path string) error {
metaData, err := toml.DecodeFile(path, c)
return c.check(&metaData, err)
}

func (c *MysqlConfig) check(metaData *toml.MetaData, err error) error {
func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
if err != nil {
return terror.ErrWorkerDecodeConfigFromFile.Delegate(err)
}
Expand Down
Loading