Skip to content

Commit

Permalink
validation(dm): manage validator life cycle (#4479)
Browse files Browse the repository at this point in the history
close #4459
  • Loading branch information
D3Hunter authored Feb 14, 2022
1 parent c3e8be9 commit 52f0ade
Show file tree
Hide file tree
Showing 35 changed files with 1,307 additions and 311 deletions.
2 changes: 2 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ ErrOpenAPITaskConfigNotExist,[code=20051:class=config:scope=internal:level=low],
ErrConfigCollationCompatibleNotSupport,[code=20052:class=config:scope=internal:level=medium], "Message: collation compatible %s not supported, Workaround: Please check the `collation_compatible` config in task configuration file, which can be set to `loose`/`strict`."
ErrConfigInvalidLoadMode,[code=20053:class=config:scope=internal:level=medium], "Message: invalid load mode '%s', Workaround: Please choose a valid value in ['sql', 'loader']"
ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore']"
ErrConfigValidationMode,[code=20055:class=config:scope=internal:level=high], "Message: invalid validation mode, Workaround: Please check `validation-mode` config in task configuration file."
ErrContinuousValidatorCfgNotFound,[code=20056:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s continuous validator config %s not exist, Workaround: Please check the `continuous-validator-config-name` config in task configuration file."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
5 changes: 4 additions & 1 deletion dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ var (
// StageSubTaskKeyAdapter is used to store the running stage of the subtask.
// k/v: Encode(source-id, task-name) -> the running stage of the subtask.
StageSubTaskKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/stage/subtask/")
// StageValidatorKeyAdapter is used to store the running stage of the validator.
// k/v: Encode(source-id, task-name) -> the running stage of the validator.
StageValidatorKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/stage/validator/")

// ShardDDLPessimismInfoKeyAdapter is used to store shard DDL info in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL info.
Expand Down Expand Up @@ -112,7 +115,7 @@ func keyAdapterKeysLen(s KeyAdapter) int {
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, OpenAPITaskTemplateKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter, StageValidatorKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
ShardDDLOptimismSourceTablesKeyAdapter, LoadTaskKeyAdapter, TaskCliArgsKeyAdapter:
return 2
Expand Down
4 changes: 4 additions & 0 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type SubTaskConfig struct {
MydumperConfig // Mydumper configuration
LoaderConfig // Loader configuration
SyncerConfig // Syncer configuration
ValidatorCfg ValidatorConfig

// compatible with standalone dm unit
LogLevel string `toml:"log-level" json:"log-level"`
Expand Down Expand Up @@ -442,6 +443,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if err := c.LoaderConfig.adjust(); err != nil {
return err
}
if err := c.ValidatorCfg.adjust(); err != nil {
return err
}

// TODO: check every member
// TODO: since we checked here, we could remove other terror like ErrSyncerUnitGenBAList
Expand Down
10 changes: 10 additions & 0 deletions dm/dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/tiflow/dm/pkg/terror"
)

func (t *testConfig) TestSubTask(c *C) {
Expand Down Expand Up @@ -69,6 +71,14 @@ func (t *testConfig) TestSubTask(c *C) {

err = cfg.Adjust(true)
c.Assert(err, IsNil)

cfg.ValidatorCfg = ValidatorConfig{Mode: ValidationFast}
err = cfg.Adjust(true)
c.Assert(err, IsNil)

cfg.ValidatorCfg = ValidatorConfig{Mode: "invalid-mode"}
err = cfg.Adjust(true)
c.Assert(terror.ErrConfigValidationMode.Equal(err), IsTrue)
}

func (t *testConfig) TestSubTaskAdjustFail(c *C) {
Expand Down
61 changes: 57 additions & 4 deletions dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ const (
StrictCollationCompatible = "strict"
)

const (
ValidationNone = "none"
ValidationFast = "fast"
ValidationFull = "full"
)

// default config item values.
var (
// TaskConfig.
Expand Down Expand Up @@ -136,6 +142,9 @@ type MySQLInstance struct {
Syncer *SyncerConfig `yaml:"syncer"`
// SyncerThread is alias for WorkerCount in SyncerConfig, and its priority is higher than WorkerCount
SyncerThread int `yaml:"syncer-thread"`

ContinuousValidatorConfigName string `yaml:"continuous-validator-config-name"`
ContinuousValidator ValidatorConfig `yaml:"-"`
}

// VerifyAndAdjust does verification on configs, and adjust some configs.
Expand Down Expand Up @@ -327,6 +336,26 @@ func (m *SyncerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

type ValidatorConfig struct {
Mode string `yaml:"mode" toml:"mode" json:"mode"`
}

func (v *ValidatorConfig) adjust() error {
if v.Mode == "" {
v.Mode = ValidationNone
}
if v.Mode != ValidationNone && v.Mode != ValidationFast && v.Mode != ValidationFull {
return terror.ErrConfigValidationMode
}
return nil
}

func defaultValidatorConfig() ValidatorConfig {
return ValidatorConfig{
Mode: ValidationNone,
}
}

// TaskConfig is the configuration for Task.
type TaskConfig struct {
*flag.FlagSet `yaml:"-" toml:"-" json:"-"`
Expand Down Expand Up @@ -377,9 +406,10 @@ type TaskConfig struct {
BWList map[string]*filter.Rules `yaml:"black-white-list" toml:"black-white-list" json:"black-white-list"`
BAList map[string]*filter.Rules `yaml:"block-allow-list" toml:"block-allow-list" json:"block-allow-list"`

Mydumpers map[string]*MydumperConfig `yaml:"mydumpers" toml:"mydumpers" json:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders" toml:"loaders" json:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers" toml:"mydumpers" json:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders" toml:"loaders" json:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`
Validators map[string]*ValidatorConfig `yaml:"validators" toml:"validators" json:"validators"`

CleanDumpFile bool `yaml:"clean-dump-file" toml:"clean-dump-file" json:"clean-dump-file"`
// deprecated
Expand Down Expand Up @@ -413,6 +443,7 @@ func NewTaskConfig() *TaskConfig {
Mydumpers: make(map[string]*MydumperConfig),
Loaders: make(map[string]*LoaderConfig),
Syncers: make(map[string]*SyncerConfig),
Validators: make(map[string]*ValidatorConfig),
CleanDumpFile: true,
CollationCompatible: defaultCollationCompatible,
}
Expand Down Expand Up @@ -470,7 +501,7 @@ func (c *TaskConfig) RawDecode(data string) error {
}

// find unused items in config.
var configRefPrefixes = []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer", "ExprFilter"}
var configRefPrefixes = []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer", "ExprFilter", "Validator"}

const (
routeRulesIdx = iota
Expand All @@ -480,6 +511,7 @@ const (
loaderIdx
syncerIdx
exprFilterIdx
validatorIdx
)

// adjust adjusts and verifies config.
Expand Down Expand Up @@ -562,6 +594,12 @@ func (c *TaskConfig) adjust() error {
}
}

for _, validatorCfg := range c.Validators {
if err := validatorCfg.adjust(); err != nil {
return err
}
}

instanceIDs := make(map[string]int) // source-id -> instance-index
globalConfigReferCount := map[string]int{}
duplicateErrorStrings := make([]string, 0)
Expand Down Expand Up @@ -684,6 +722,16 @@ func (c *TaskConfig) adjust() error {
inst.Syncer.WorkerCount = inst.SyncerThread
}

inst.ContinuousValidator = defaultValidatorConfig()
if inst.ContinuousValidatorConfigName != "" {
rule, ok := c.Validators[inst.ContinuousValidatorConfigName]
if !ok {
return terror.ErrContinuousValidatorCfgNotFound.Generate(i, inst.ContinuousValidatorConfigName)
}
globalConfigReferCount[configRefPrefixes[validatorIdx]+inst.ContinuousValidatorConfigName]++
inst.ContinuousValidator = *rule
}

// for backward compatible, set global config `ansi-quotes: true` if any syncer is true
if inst.Syncer.EnableANSIQuotes {
log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect")
Expand Down Expand Up @@ -755,6 +803,11 @@ func (c *TaskConfig) adjust() error {
unusedConfigs = append(unusedConfigs, exprFilter)
}
}
for key := range c.Validators {
if globalConfigReferCount[configRefPrefixes[validatorIdx]+key] == 0 {
unusedConfigs = append(unusedConfigs, key)
}
}

if len(unusedConfigs) != 0 {
sort.Strings(unusedConfigs)
Expand Down
32 changes: 20 additions & 12 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*
cfg.MydumperConfig = *inst.Mydumper
cfg.LoaderConfig = *inst.Loader
cfg.SyncerConfig = *inst.Syncer
cfg.ValidatorCfg = inst.ContinuousValidator

cfg.CleanDumpFile = c.CleanDumpFile

Expand Down Expand Up @@ -205,6 +206,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
subTaskCfg.SyncerConfig.Batch = *incrCfg.ReplBatch
}
}
subTaskCfg.ValidatorCfg = defaultValidatorConfig()
// set route,blockAllowList,filter config
doCnt := len(tableMigrateRuleMap[sourceCfg.SourceName])
doDBs := make([]string, doCnt)
Expand Down Expand Up @@ -302,6 +304,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.Syncers = make(map[string]*SyncerConfig)
c.ExprFilter = make(map[string]*ExpressionFilter)
c.Experimental = stCfg0.Experimental
c.Validators = make(map[string]*ValidatorConfig)

baListMap := make(map[string]string, len(stCfgs))
routeMap := make(map[string]string, len(stCfgs))
Expand All @@ -311,8 +314,9 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
syncMap := make(map[string]string, len(stCfgs))
cmMap := make(map[string]string, len(stCfgs))
exprFilterMap := make(map[string]string, len(stCfgs))
var baListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, cmIdx, efIdx int
var baListName, routeName, filterName, dumpName, loadName, syncName, cmName, efName string
validatorMap := make(map[string]string, len(stCfgs))
var baListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, validateIdx, cmIdx, efIdx int
var baListName, routeName, filterName, dumpName, loadName, syncName, validateName, cmName, efName string

// NOTE:
// - we choose to ref global configs for instances now.
Expand Down Expand Up @@ -354,6 +358,9 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.ExprFilter[efName] = f
}

validateName, validateIdx = getGenerateName(stCfg.ValidatorCfg, validateIdx, "validator", validatorMap)
c.Validators[validateName] = &stCfg.ValidatorCfg

cmNames := make([]string, 0, len(stCfg.ColumnMappingRules))
for _, rule := range stCfg.ColumnMappingRules {
cmName, cmIdx = getGenerateName(rule, cmIdx, "cm", cmMap)
Expand All @@ -362,16 +369,17 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
}

c.MySQLInstances = append(c.MySQLInstances, &MySQLInstance{
SourceID: stCfg.SourceID,
Meta: stCfg.Meta,
FilterRules: filterNames,
ColumnMappingRules: cmNames,
RouteRules: routeNames,
BAListName: baListName,
MydumperConfigName: dumpName,
LoaderConfigName: loadName,
SyncerConfigName: syncName,
ExpressionFilters: exprFilterNames,
SourceID: stCfg.SourceID,
Meta: stCfg.Meta,
FilterRules: filterNames,
ColumnMappingRules: cmNames,
RouteRules: routeNames,
BAListName: baListName,
MydumperConfigName: dumpName,
LoaderConfigName: loadName,
SyncerConfigName: syncName,
ExpressionFilters: exprFilterNames,
ContinuousValidatorConfigName: validateName,
})
}
if c.CollationCompatible == "" {
Expand Down
15 changes: 14 additions & 1 deletion dm/dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Table: "tbl",
DeleteValueExpr: "state = 1",
}
validatorCfg = ValidatorConfig{Mode: ValidationNone}
source1DBCfg = DBConfig{
Host: "127.0.0.1",
Port: 3306,
Expand Down Expand Up @@ -705,6 +706,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
EnableGTID: true,
SafeMode: true,
},
ValidatorCfg: validatorCfg,
CleanDumpFile: true,
EnableANSIQuotes: true,
}
Expand All @@ -723,6 +725,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
stCfg2.BAList = &baList2
stCfg2.RouteRules = []*router.TableRule{&routeRule4, &routeRule1, &routeRule2}
stCfg2.ExprFilter = []*ExpressionFilter{&exprFilter1}
stCfg2.ValidatorCfg.Mode = ValidationFast

cfg := SubTaskConfigsToTaskConfig(stCfg1, stCfg2)

Expand Down Expand Up @@ -757,6 +760,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
SyncerConfigName: "sync-01",
Syncer: nil,
SyncerThread: 0,

ContinuousValidatorConfigName: "validator-01",
},
{
SourceID: source2,
Expand All @@ -776,6 +781,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Syncer: nil,
SyncerThread: 0,
ExpressionFilters: []string{"expr-filter-01"},

ContinuousValidatorConfigName: "validator-02",
},
},
OnlineDDL: onlineDDL,
Expand Down Expand Up @@ -807,6 +814,12 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
ExprFilter: map[string]*ExpressionFilter{
"expr-filter-01": &exprFilter1,
},
Validators: map[string]*ValidatorConfig{
"validator-01": &validatorCfg,
"validator-02": {
Mode: ValidationFast,
},
},
CleanDumpFile: stCfg1.CleanDumpFile,
}
cfg2.Experimental.AsyncCheckpointFlush = true
Expand Down Expand Up @@ -1031,7 +1044,7 @@ func (t *testConfig) TestTaskConfigForDowngrade(c *C) {
// make sure all new field were added
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+3) // without flag, collation_compatible and experimental
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+4) // without flag, collation_compatible, experimental, validator

// make sure all field were copied
cfgForClone := &TaskConfigForDowngrade{}
Expand Down
Loading

0 comments on commit 52f0ade

Please sign in to comment.