Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validation(dm): manage validator life cycle #4479

Merged
merged 42 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b19e8f6
add continuous validation config
D3Hunter Jan 18, 2022
c38ab53
add validator: ha and basic code
D3Hunter Jan 24, 2022
a8efd33
fix lint
D3Hunter Jan 24, 2022
225149f
remove comments
D3Hunter Jan 24, 2022
ad8222d
release resource on close
D3Hunter Jan 24, 2022
f23dea7
done on finish
D3Hunter Jan 24, 2022
3e08018
add some test case
D3Hunter Jan 24, 2022
c18fadd
manage validator in scheduler
D3Hunter Jan 25, 2022
95ea179
fix
D3Hunter Jan 25, 2022
3d8a588
ut for validator observer
D3Hunter Jan 25, 2022
df77e31
rename op
D3Hunter Jan 25, 2022
57c7850
validator start with stage to support ha
D3Hunter Jan 25, 2022
658f878
adjust, check on start
D3Hunter Jan 26, 2022
e3c7f29
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Jan 26, 2022
161ed52
fix from self-review
D3Hunter Jan 26, 2022
e562a6e
adjust validator config in subtask.adjust
D3Hunter Jan 26, 2022
cd1cf0c
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Jan 26, 2022
1fdc5a6
fix ut
D3Hunter Jan 26, 2022
9fd05dc
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Jan 26, 2022
85e9607
fix integration test
D3Hunter Jan 27, 2022
d43aadb
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Jan 27, 2022
d3f9222
fix integration test
D3Hunter Jan 27, 2022
1564f50
to read lock
D3Hunter Jan 27, 2022
4912d86
Merge branch 'master' into fast-validator
D3Hunter Jan 28, 2022
5860901
Merge branch 'master' into fast-validator
D3Hunter Feb 7, 2022
5a17acf
fix comments
D3Hunter Feb 8, 2022
23f669a
Merge remote-tracking branch 'origin/fast-validator' into fast-validator
D3Hunter Feb 8, 2022
3434024
Merge branch 'master' into fast-validator
D3Hunter Feb 8, 2022
47f24bb
Merge branch 'master' into fast-validator
D3Hunter Feb 9, 2022
3d67de7
fix comments
D3Hunter Feb 9, 2022
bc86e59
Merge remote-tracking branch 'origin/fast-validator' into fast-validator
D3Hunter Feb 9, 2022
3ac5905
get latest stage on st.Run
D3Hunter Feb 9, 2022
fb9ab60
fix ut caused by unadjusted cfg
D3Hunter Feb 10, 2022
f61529d
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Feb 10, 2022
b03f047
fix lint
D3Hunter Feb 10, 2022
50ced31
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Feb 10, 2022
f660c50
Update dm/syncer/data_validator.go
D3Hunter Feb 10, 2022
f2f00fb
Merge branch 'master' into fast-validator
D3Hunter Feb 10, 2022
875d50b
add comments, fix when validator observer observes new subtask event …
D3Hunter Feb 11, 2022
1bcd66a
Merge remote-tracking branch 'upstream/master' into fast-validator
D3Hunter Feb 14, 2022
567b305
fix lint
D3Hunter Feb 14, 2022
2e4bb60
Merge branch 'master' into fast-validator
D3Hunter Feb 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ ErrOpenAPITaskConfigNotExist,[code=20051:class=config:scope=internal:level=low],
ErrConfigCollationCompatibleNotSupport,[code=20052:class=config:scope=internal:level=medium], "Message: collation compatible %s not supported, Workaround: Please check the `collation_compatible` config in task configuration file, which can be set to `loose`/`strict`."
ErrConfigInvalidLoadMode,[code=20053:class=config:scope=internal:level=medium], "Message: invalid load mode '%s', Workaround: Please choose a valid value in ['sql', 'loader']"
ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore']"
ErrConfigValidationMode,[code=20055:class=config:scope=internal:level=high], "Message: invalid validation mode, Workaround: Please check `validation-mode` config in task configuration file."
ErrContinuousValidatorCfgNotFound,[code=20056:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s continuous validator config %s not exist, Workaround: Please check the `continuous-validator-config-name` config in task configuration file."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
5 changes: 4 additions & 1 deletion dm/dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ var (
// StageSubTaskKeyAdapter is used to store the running stage of the subtask.
// k/v: Encode(source-id, task-name) -> the running stage of the subtask.
StageSubTaskKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/stage/subtask/")
// StageValidatorKeyAdapter is used to store the running stage of the validator.
// k/v: Encode(source-id, task-name) -> the running stage of the validator.
StageValidatorKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/stage/validator/")

// ShardDDLPessimismInfoKeyAdapter is used to store shard DDL info in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL info.
Expand Down Expand Up @@ -112,7 +115,7 @@ func keyAdapterKeysLen(s KeyAdapter) int {
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, OpenAPITaskTemplateKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter, StageValidatorKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
ShardDDLOptimismSourceTablesKeyAdapter, LoadTaskKeyAdapter, TaskCliArgsKeyAdapter:
return 2
Expand Down
4 changes: 4 additions & 0 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type SubTaskConfig struct {
MydumperConfig // Mydumper configuration
LoaderConfig // Loader configuration
SyncerConfig // Syncer configuration
ValidatorCfg ValidatorConfig

// compatible with standalone dm unit
LogLevel string `toml:"log-level" json:"log-level"`
Expand Down Expand Up @@ -442,6 +443,9 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if err := c.LoaderConfig.adjust(); err != nil {
return err
}
if err := c.ValidatorCfg.adjust(); err != nil {
return err
}

// TODO: check every member
// TODO: since we checked here, we could remove other terror like ErrSyncerUnitGenBAList
Expand Down
10 changes: 10 additions & 0 deletions dm/dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"

"github.com/pingcap/tiflow/dm/pkg/terror"
)

func (t *testConfig) TestSubTask(c *C) {
Expand Down Expand Up @@ -69,6 +71,14 @@ func (t *testConfig) TestSubTask(c *C) {

err = cfg.Adjust(true)
c.Assert(err, IsNil)

cfg.ValidatorCfg = ValidatorConfig{Mode: ValidationFast}
err = cfg.Adjust(true)
c.Assert(err, IsNil)

cfg.ValidatorCfg = ValidatorConfig{Mode: "invalid-mode"}
err = cfg.Adjust(true)
c.Assert(terror.ErrConfigValidationMode.Equal(err), IsTrue)
}

func (t *testConfig) TestSubTaskAdjustFail(c *C) {
Expand Down
61 changes: 57 additions & 4 deletions dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ const (
StrictCollationCompatible = "strict"
)

const (
ValidationNone = "none"
ValidationFast = "fast"
ValidationFull = "full"
)

// default config item values.
var (
// TaskConfig.
Expand Down Expand Up @@ -136,6 +142,9 @@ type MySQLInstance struct {
Syncer *SyncerConfig `yaml:"syncer"`
// SyncerThread is alias for WorkerCount in SyncerConfig, and its priority is higher than WorkerCount
SyncerThread int `yaml:"syncer-thread"`

ContinuousValidatorConfigName string `yaml:"continuous-validator-config-name"`
ContinuousValidator ValidatorConfig `yaml:"-"`
}

// VerifyAndAdjust does verification on configs, and adjust some configs.
Expand Down Expand Up @@ -327,6 +336,26 @@ func (m *SyncerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}

type ValidatorConfig struct {
Mode string `yaml:"mode" toml:"mode" json:"mode"`
}

func (v *ValidatorConfig) adjust() error {
if v.Mode == "" {
v.Mode = ValidationNone
}
if v.Mode != ValidationNone && v.Mode != ValidationFast && v.Mode != ValidationFull {
return terror.ErrConfigValidationMode
}
return nil
}

func defaultValidatorConfig() ValidatorConfig {
return ValidatorConfig{
Mode: ValidationNone,
}
}

// TaskConfig is the configuration for Task.
type TaskConfig struct {
*flag.FlagSet `yaml:"-" toml:"-" json:"-"`
Expand Down Expand Up @@ -377,9 +406,10 @@ type TaskConfig struct {
BWList map[string]*filter.Rules `yaml:"black-white-list" toml:"black-white-list" json:"black-white-list"`
BAList map[string]*filter.Rules `yaml:"block-allow-list" toml:"block-allow-list" json:"block-allow-list"`

Mydumpers map[string]*MydumperConfig `yaml:"mydumpers" toml:"mydumpers" json:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders" toml:"loaders" json:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers" toml:"mydumpers" json:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders" toml:"loaders" json:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`
Validators map[string]*ValidatorConfig `yaml:"validators" toml:"validators" json:"validators"`
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved

CleanDumpFile bool `yaml:"clean-dump-file" toml:"clean-dump-file" json:"clean-dump-file"`
// deprecated
Expand Down Expand Up @@ -413,6 +443,7 @@ func NewTaskConfig() *TaskConfig {
Mydumpers: make(map[string]*MydumperConfig),
Loaders: make(map[string]*LoaderConfig),
Syncers: make(map[string]*SyncerConfig),
Validators: make(map[string]*ValidatorConfig),
CleanDumpFile: true,
CollationCompatible: defaultCollationCompatible,
}
Expand Down Expand Up @@ -470,7 +501,7 @@ func (c *TaskConfig) RawDecode(data string) error {
}

// find unused items in config.
var configRefPrefixes = []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer", "ExprFilter"}
var configRefPrefixes = []string{"RouteRules", "FilterRules", "ColumnMappingRules", "Mydumper", "Loader", "Syncer", "ExprFilter", "Validator"}

const (
routeRulesIdx = iota
Expand All @@ -480,6 +511,7 @@ const (
loaderIdx
syncerIdx
exprFilterIdx
validatorIdx
)

// adjust adjusts and verifies config.
Expand Down Expand Up @@ -562,6 +594,12 @@ func (c *TaskConfig) adjust() error {
}
}

for _, validatorCfg := range c.Validators {
if err := validatorCfg.adjust(); err != nil {
return err
}
}

instanceIDs := make(map[string]int) // source-id -> instance-index
globalConfigReferCount := map[string]int{}
duplicateErrorStrings := make([]string, 0)
Expand Down Expand Up @@ -684,6 +722,16 @@ func (c *TaskConfig) adjust() error {
inst.Syncer.WorkerCount = inst.SyncerThread
}

inst.ContinuousValidator = defaultValidatorConfig()
if inst.ContinuousValidatorConfigName != "" {
rule, ok := c.Validators[inst.ContinuousValidatorConfigName]
if !ok {
return terror.ErrContinuousValidatorCfgNotFound.Generate(i, inst.ContinuousValidatorConfigName)
}
globalConfigReferCount[configRefPrefixes[validatorIdx]+inst.ContinuousValidatorConfigName]++
inst.ContinuousValidator = *rule
}

// for backward compatible, set global config `ansi-quotes: true` if any syncer is true
if inst.Syncer.EnableANSIQuotes {
log.L().Warn("DM could discover proper ANSI_QUOTES, `enable-ansi-quotes` is no longer take effect")
Expand Down Expand Up @@ -755,6 +803,11 @@ func (c *TaskConfig) adjust() error {
unusedConfigs = append(unusedConfigs, exprFilter)
}
}
for key := range c.Validators {
if globalConfigReferCount[configRefPrefixes[validatorIdx]+key] == 0 {
unusedConfigs = append(unusedConfigs, key)
}
}

if len(unusedConfigs) != 0 {
sort.Strings(unusedConfigs)
Expand Down
32 changes: 20 additions & 12 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*
cfg.MydumperConfig = *inst.Mydumper
cfg.LoaderConfig = *inst.Loader
cfg.SyncerConfig = *inst.Syncer
cfg.ValidatorCfg = inst.ContinuousValidator

cfg.CleanDumpFile = c.CleanDumpFile

Expand Down Expand Up @@ -205,6 +206,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
subTaskCfg.SyncerConfig.Batch = *incrCfg.ReplBatch
}
}
subTaskCfg.ValidatorCfg = defaultValidatorConfig()
// set route,blockAllowList,filter config
doCnt := len(tableMigrateRuleMap[sourceCfg.SourceName])
doDBs := make([]string, doCnt)
Expand Down Expand Up @@ -302,6 +304,7 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.Syncers = make(map[string]*SyncerConfig)
c.ExprFilter = make(map[string]*ExpressionFilter)
c.Experimental = stCfg0.Experimental
c.Validators = make(map[string]*ValidatorConfig)

baListMap := make(map[string]string, len(stCfgs))
routeMap := make(map[string]string, len(stCfgs))
Expand All @@ -311,8 +314,9 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
syncMap := make(map[string]string, len(stCfgs))
cmMap := make(map[string]string, len(stCfgs))
exprFilterMap := make(map[string]string, len(stCfgs))
var baListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, cmIdx, efIdx int
var baListName, routeName, filterName, dumpName, loadName, syncName, cmName, efName string
validatorMap := make(map[string]string, len(stCfgs))
var baListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, validateIdx, cmIdx, efIdx int
var baListName, routeName, filterName, dumpName, loadName, syncName, validateName, cmName, efName string

// NOTE:
// - we choose to ref global configs for instances now.
Expand Down Expand Up @@ -354,6 +358,9 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
c.ExprFilter[efName] = f
}

validateName, validateIdx = getGenerateName(stCfg.ValidatorCfg, validateIdx, "validator", validatorMap)
c.Validators[validateName] = &stCfg.ValidatorCfg

cmNames := make([]string, 0, len(stCfg.ColumnMappingRules))
for _, rule := range stCfg.ColumnMappingRules {
cmName, cmIdx = getGenerateName(rule, cmIdx, "cm", cmMap)
Expand All @@ -362,16 +369,17 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {
}

c.MySQLInstances = append(c.MySQLInstances, &MySQLInstance{
SourceID: stCfg.SourceID,
Meta: stCfg.Meta,
FilterRules: filterNames,
ColumnMappingRules: cmNames,
RouteRules: routeNames,
BAListName: baListName,
MydumperConfigName: dumpName,
LoaderConfigName: loadName,
SyncerConfigName: syncName,
ExpressionFilters: exprFilterNames,
SourceID: stCfg.SourceID,
Meta: stCfg.Meta,
FilterRules: filterNames,
ColumnMappingRules: cmNames,
RouteRules: routeNames,
BAListName: baListName,
MydumperConfigName: dumpName,
LoaderConfigName: loadName,
SyncerConfigName: syncName,
ExpressionFilters: exprFilterNames,
ContinuousValidatorConfigName: validateName,
})
}
if c.CollationCompatible == "" {
Expand Down
15 changes: 14 additions & 1 deletion dm/dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Table: "tbl",
DeleteValueExpr: "state = 1",
}
validatorCfg = ValidatorConfig{Mode: ValidationNone}
source1DBCfg = DBConfig{
Host: "127.0.0.1",
Port: 3306,
Expand Down Expand Up @@ -705,6 +706,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
EnableGTID: true,
SafeMode: true,
},
ValidatorCfg: validatorCfg,
CleanDumpFile: true,
EnableANSIQuotes: true,
}
Expand All @@ -723,6 +725,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
stCfg2.BAList = &baList2
stCfg2.RouteRules = []*router.TableRule{&routeRule4, &routeRule1, &routeRule2}
stCfg2.ExprFilter = []*ExpressionFilter{&exprFilter1}
stCfg2.ValidatorCfg.Mode = ValidationFast

cfg := SubTaskConfigsToTaskConfig(stCfg1, stCfg2)

Expand Down Expand Up @@ -757,6 +760,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
SyncerConfigName: "sync-01",
Syncer: nil,
SyncerThread: 0,

ContinuousValidatorConfigName: "validator-01",
},
{
SourceID: source2,
Expand All @@ -776,6 +781,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Syncer: nil,
SyncerThread: 0,
ExpressionFilters: []string{"expr-filter-01"},

ContinuousValidatorConfigName: "validator-02",
},
},
OnlineDDL: onlineDDL,
Expand Down Expand Up @@ -807,6 +814,12 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
ExprFilter: map[string]*ExpressionFilter{
"expr-filter-01": &exprFilter1,
},
Validators: map[string]*ValidatorConfig{
"validator-01": &validatorCfg,
"validator-02": {
Mode: ValidationFast,
},
},
CleanDumpFile: stCfg1.CleanDumpFile,
}
cfg2.Experimental.AsyncCheckpointFlush = true
Expand Down Expand Up @@ -1031,7 +1044,7 @@ func (t *testConfig) TestTaskConfigForDowngrade(c *C) {
// make sure all new field were added
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+3) // without flag, collation_compatible and experimental
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+4) // without flag, collation_compatible, experimental, validator

// make sure all field were copied
cfgForClone := &TaskConfigForDowngrade{}
Expand Down
Loading