Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker(dm): support concurrent check #3975

Merged
merged 73 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
47ad3b2
save work
okJiang Dec 14, 2021
7ac9074
add tableChecker concurrency
okJiang Dec 14, 2021
f71f08e
support shard table check
okJiang Dec 14, 2021
c237757
Merge branch 'master' into pre-check-concurrency
okJiang Dec 20, 2021
ba16901
add manual test
okJiang Dec 21, 2021
6532407
save work
okJiang Dec 22, 2021
ae9bf75
save work
okJiang Dec 23, 2021
caf9208
fix deadlock
okJiang Dec 23, 2021
bf58d51
save work
okJiang Dec 23, 2021
e46d1e4
fix
okJiang Dec 23, 2021
00497b7
Merge branch 'master' into pre-check-concurrency
okJiang Dec 23, 2021
9462820
fix verify
okJiang Dec 23, 2021
7975181
make check-static
okJiang Dec 23, 2021
105c709
address comment and adjust the concurrency gradient
okJiang Dec 24, 2021
d8a95c1
add max_connections
okJiang Dec 24, 2021
ac72696
address commen:
okJiang Dec 28, 2021
99d9611
save work
okJiang Dec 29, 2021
4927461
address some comments
okJiang Jan 4, 2022
ca69636
remove manual test
okJiang Jan 4, 2022
1f4d1e6
address comment
okJiang Jan 4, 2022
f380676
address comment
okJiang Jan 4, 2022
383975d
Merge branch 'master' into pre-check-concurrency
okJiang Jan 10, 2022
aafd02c
Merge branch 'master' of github.com:okJiang/ticdc into pre-check-conc…
okJiang Jan 10, 2022
b8b1535
Merge branch 'pre-check-concurrency' of github.com:okJiang/ticdc into…
okJiang Jan 10, 2022
b1220b2
revert error handle
okJiang Jan 10, 2022
7fbc334
address comment
okJiang Jan 12, 2022
d86e107
save work
okJiang Jan 12, 2022
a90faaa
save work
okJiang Jan 12, 2022
5cbb989
Update dm/pkg/checker/table_structure.go
okJiang Jan 12, 2022
837927b
fix
okJiang Jan 14, 2022
7090df9
Merge branch 'master' of github.com:pingcap/ticdc into pre-check-conc…
okJiang Jan 14, 2022
192c449
address comment
okJiang Jan 14, 2022
bb74055
Merge branch 'master' of github.com:pingcap/ticdc into pre-check-conc…
okJiang Jan 17, 2022
fa9a7c9
fix conflict
okJiang Jan 17, 2022
efe63f8
fix lint
okJiang Jan 17, 2022
691e42f
fix
okJiang Jan 17, 2022
ebf89a1
Merge branch 'master' into pre-check-concurrency
okJiang Jan 17, 2022
7a818ab
address comment
okJiang Jan 17, 2022
a2431c2
Merge branch 'pre-check-concurrency' of github.com:okJiang/ticdc into…
okJiang Jan 17, 2022
3381c0e
typo
okJiang Jan 17, 2022
060cf28
address comment
okJiang Jan 17, 2022
5b89993
fix test
okJiang Jan 17, 2022
3790a82
fix lint
okJiang Jan 17, 2022
584000d
instead instance of sourceID
okJiang Jan 24, 2022
5672b17
address comment
okJiang Jan 24, 2022
5f7cb60
address comment(gmh)
okJiang Jan 24, 2022
7075883
fix
okJiang Jan 24, 2022
4c4ba79
wrap getCreateTableStmt
okJiang Jan 24, 2022
3c45ef3
Merge branch 'master' of github.com:pingcap/ticdc into pre-check-conc…
okJiang Jan 26, 2022
9258eb2
Merge branch 'master' into pre-check-concurrency
okJiang Jan 26, 2022
568119f
Merge branch 'pre-check-concurrency' of github.com:okJiang/ticdc into…
okJiang Jan 26, 2022
a88438d
address comment
okJiang Jan 26, 2022
90442d1
address comment
okJiang Jan 26, 2022
a736a53
address comment: follow dumpThreads
okJiang Jan 26, 2022
e54e3c0
address comment: use errgroup
okJiang Jan 28, 2022
73a485b
address comment: add handleOpts
okJiang Jan 28, 2022
5caf9a7
address comment
okJiang Jan 28, 2022
994e410
Merge branch 'master' into pre-check-concurrency
okJiang Jan 28, 2022
23241c2
fix ut
okJiang Feb 7, 2022
901aecb
remove some ut
okJiang Feb 7, 2022
d1ff3ad
adjust auto increment key
okJiang Feb 7, 2022
2dff9bb
fix lint
okJiang Feb 7, 2022
c6e753c
address comment
okJiang Feb 8, 2022
1983d06
address comment(ehco)
okJiang Feb 11, 2022
e5c2571
address comment(GMH)
okJiang Feb 11, 2022
dfc8e81
address comment
okJiang Feb 11, 2022
2df639d
Merge branch 'master' into pre-check-concurrency
ti-chi-bot Feb 11, 2022
48723c0
Merge branch 'master' into pre-check-concurrency
ti-chi-bot Feb 11, 2022
7930508
Merge branch 'master' into pre-check-concurrency
ti-chi-bot Feb 11, 2022
a4ef70b
Merge branch 'master' into pre-check-concurrency
ti-chi-bot Feb 11, 2022
0b2051c
fix data race
okJiang Feb 12, 2022
67bb77b
Merge branch 'pre-check-concurrency' of github.com:okJiang/ticdc into…
okJiang Feb 12, 2022
4d91558
Merge branch 'master' into pre-check-concurrency
Ehco1996 Feb 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 51 additions & 36 deletions dm/checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,32 +255,33 @@ func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) {
}

createTable1 := `CREATE TABLE %s (
id int(11) DEFAULT NULL,
b int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
id int(11) DEFAULT NULL,
b int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
createTable2 := `CREATE TABLE %s (
id int(11) DEFAULT NULL,
b int(11) DEFAULT NULL,
UNIQUE KEY id (id)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
id int(11) DEFAULT NULL,
b int(11) DEFAULT NULL,
UNIQUE KEY id (id)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

mock := initMockDB(c)
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(len(msg), tc.Equals, 0)
c.Assert(err, tc.ErrorMatches, "(.|\n)*primary/unique key does not exist(.|\n)*")
c.Assert(len(msg), tc.Equals, 0)

mock = initMockDB(c)
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb2)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
c.Assert(err, tc.IsNil)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
}

func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
Expand All @@ -299,29 +300,37 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
}

createTable1 := `CREATE TABLE %s (
id int(11) DEFAULT NULL,
b int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
id int(11) DEFAULT NULL,
b int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
createTable2 := `CREATE TABLE %s (
id int(11) DEFAULT NULL,
c int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
id int(11) DEFAULT NULL,
c int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

mock := initMockDB(c)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb2)))
msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(len(msg), tc.Equals, 0)
c.Assert(err, tc.ErrorMatches, "(.|\n)*different column definition(.|\n)*")
c.Assert(len(msg), tc.Equals, 0)

mock = initMockDB(c)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2)))
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
c.Assert(err, tc.IsNil)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
}

func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
Expand All @@ -340,36 +349,42 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
}

createTable1 := `CREATE TABLE %s (
id int(11) NOT NULL AUTO_INCREMENT,
b int(11) DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
id int(11) NOT NULL AUTO_INCREMENT,
b int(11) DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

createTable2 := `CREATE TABLE %s (
id int(11) NOT NULL,
b int(11) DEFAULT NULL,
INDEX (id),
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
id int(11) NOT NULL,
b int(11) DEFAULT NULL,
INDEX (id),
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

mock := initMockDB(c)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2)))
msg, err := CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(len(msg), tc.Equals, 0)
c.Assert(err, tc.ErrorMatches, "(.|\n)*instance table .* of sharding .* have auto-increment key(.|\n)*")
c.Assert(msg, tc.Matches, "(.|\n)*sourceID table .* of sharding .* have auto-increment key(.|\n)*")
c.Assert(err, tc.IsNil)

mock = conn.InitMockDB(c)
mock.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"DATABASE"}).AddRow(schema))
mock.ExpectQuery("SHOW FULL TABLES").WillReturnRows(sqlmock.NewRows([]string{"Tables_in_" + schema, "Table_type"}).AddRow(tb1, "BASE TABLE").AddRow(tb2, "BASE TABLE"))
mock = initMockDB(c)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb2)))
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
c.Assert(err, tc.IsNil)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
}

func (s *testCheckerSuite) TestSameTargetTableDetection(c *tc.C) {
Expand Down
41 changes: 19 additions & 22 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@ func (c *Checker) Init(ctx context.Context) (err error) {
rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: c.closeDBs})

c.tctx = tcontext.NewContext(ctx, log.With(zap.String("unit", "task check")))
// target name => source => schema => [tables]
sharding := make(map[string]map[string]map[string][]string)
// targetTableID => source => [tables]
sharding := make(map[string]map[string][]*filter.Table)
shardingCounter := make(map[string]int)
// sourceID => []table
checkTablesMap := make(map[string][]*filter.Table)
dbs := make(map[string]*sql.DB)
columnMapping := make(map[string]*column.Mapping)
_, checkingShardID := c.checkingItems[config.ShardAutoIncrementIDChecking]
Expand Down Expand Up @@ -206,29 +208,22 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return err
}

// checkTables map schema => {table1, table2, ...}
checkTables := make(map[string][]string)
var checkTables []*filter.Table
checkSchemas := make(map[string]struct{}, len(mapping))
for name, tables := range mapping {
for targetTableID, tables := range mapping {
checkTables = append(checkTables, tables...)
if _, ok := sharding[targetTableID]; !ok {
sharding[targetTableID] = make(map[string][]*filter.Table)
}
sharding[targetTableID][instance.cfg.SourceID] = append(sharding[targetTableID][instance.cfg.SourceID], tables...)
shardingCounter[targetTableID] += len(tables)
for _, table := range tables {
checkTables[table.Schema] = append(checkTables[table.Schema], table.Name)
if _, ok := checkSchemas[table.Schema]; !ok {
checkSchemas[table.Schema] = struct{}{}
}
if _, ok := sharding[name]; !ok {
sharding[name] = make(map[string]map[string][]string)
}
if _, ok := sharding[name][instance.cfg.SourceID]; !ok {
sharding[name][instance.cfg.SourceID] = make(map[string][]string)
}
if _, ok := sharding[name][instance.cfg.SourceID][table.Schema]; !ok {
sharding[name][instance.cfg.SourceID][table.Schema] = make([]string, 0, 1)
}

sharding[name][instance.cfg.SourceID][table.Schema] = append(sharding[name][instance.cfg.SourceID][table.Schema], table.Name)
shardingCounter[name]++
}
}
checkTablesMap[instance.cfg.SourceID] = checkTables
dbs[instance.cfg.SourceID] = instance.sourceDB.DB
if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok {
exportCfg := export.DefaultConfig()
Expand All @@ -241,9 +236,11 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if c.onlineDDL != nil {
c.checkList = append(c.checkList, checker.NewOnlineDDLChecker(instance.sourceDB.DB, checkSchemas, c.onlineDDL, bw))
}
if checkSchema {
c.checkList = append(c.checkList, checker.NewTablesChecker(instance.sourceDB.DB, instance.sourceDBinfo, checkTables))
}
}

dumpThreads := c.instances[0].cfg.MydumperConfig.Threads
if checkSchema {
c.checkList = append(c.checkList, checker.NewTablesChecker(dbs, checkTablesMap, dumpThreads))
}

if checkingShard {
Expand All @@ -252,7 +249,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
continue
}

c.checkList = append(c.checkList, checker.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID))
c.checkList = append(c.checkList, checker.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads))
}
}

Expand Down
9 changes: 4 additions & 5 deletions dm/pkg/checker/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,12 @@ func (pc *MySQLServerIDChecker) Check(ctx context.Context) *Result {

serverID, err := dbutil.ShowServerID(ctx, pc.db)
if err != nil {
if utils.OriginError(err) == sql.ErrNoRows {
result.Errors = append(result.Errors, NewError("server_id not set"))
result.Instruction = "please set server_id in your database"
} else {
if utils.OriginError(err) != sql.ErrNoRows {
GMHDBJD marked this conversation as resolved.
Show resolved Hide resolved
markCheckError(result, err)
return result
}

result.Errors = append(result.Errors, NewError("server_id not set"))
result.Instruction = "please set server_id in your database"
return result
}

Expand Down
20 changes: 9 additions & 11 deletions dm/pkg/checker/privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
Expand All @@ -41,12 +42,12 @@ var privNeedGlobal = map[mysql.PrivilegeType]struct{}{
type SourceDumpPrivilegeChecker struct {
db *sql.DB
dbinfo *dbutil.DBConfig
checkTables map[string][]string // map schema => {table1, table2, ...}
checkTables []*filter.Table
consistency string
}

// NewSourceDumpPrivilegeChecker returns a RealChecker.
func NewSourceDumpPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig, checkTables map[string][]string, consistency string) RealChecker {
func NewSourceDumpPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig, checkTables []*filter.Table, consistency string) RealChecker {
return &SourceDumpPrivilegeChecker{
db: db,
dbinfo: dbinfo,
Expand Down Expand Up @@ -320,23 +321,20 @@ func verifyPrivileges(result *Result, grants []string, lackPriv map[mysql.Privil
return NewError(privileges)
}

// checkTables map schema => {table1, table2, ...}.
// lackPriv map privilege => schema => table.
func genExpectPriv(privileges map[mysql.PrivilegeType]struct{}, checkTables map[string][]string) map[mysql.PrivilegeType]map[string]map[string]struct{} {
func genExpectPriv(privileges map[mysql.PrivilegeType]struct{}, checkTables []*filter.Table) map[mysql.PrivilegeType]map[string]map[string]struct{} {
lackPriv := make(map[mysql.PrivilegeType]map[string]map[string]struct{}, len(privileges))
for p := range privileges {
if _, ok := privNeedGlobal[p]; ok {
lackPriv[p] = make(map[string]map[string]struct{})
continue
}
lackPriv[p] = make(map[string]map[string]struct{}, len(checkTables))
for schema, tables := range checkTables {
if _, ok := lackPriv[p][schema]; !ok {
lackPriv[p][schema] = make(map[string]struct{}, len(tables))
}
for _, table := range tables {
lackPriv[p][schema][table] = struct{}{}
for _, table := range checkTables {
if _, ok := lackPriv[p][table.Schema]; !ok {
lackPriv[p][table.Schema] = make(map[string]struct{})
}
lackPriv[p][table.Schema][table.Name] = struct{}{}
}
if p == mysql.SelectPriv {
if _, ok := lackPriv[p]["INFORMATION_SCHEMA"]; !ok {
Expand All @@ -353,7 +351,7 @@ func genReplicPriv(replicationPrivileges map[mysql.PrivilegeType]struct{}) map[m
return genExpectPriv(replicationPrivileges, nil)
}

func genDumpPriv(dumpPrivileges map[mysql.PrivilegeType]struct{}, checkTables map[string][]string) map[mysql.PrivilegeType]map[string]map[string]struct{} {
func genDumpPriv(dumpPrivileges map[mysql.PrivilegeType]struct{}, checkTables []*filter.Table) map[mysql.PrivilegeType]map[string]map[string]struct{} {
// due to dump privilege checker need check db/table level privilege
// so we need know the check tables
return genExpectPriv(dumpPrivileges, checkTables)
Expand Down
25 changes: 13 additions & 12 deletions dm/pkg/checker/privilege_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

tc "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser/mysql"
)

Expand All @@ -31,7 +32,7 @@ type testCheckSuite struct{}
func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
cases := []struct {
grants []string
checkTables map[string][]string
checkTables []*filter.Table
dumpState State
errMatch string
}{
Expand All @@ -58,8 +59,8 @@ func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
"GRANT EXECUTE ON FUNCTION db1.anomaly_score TO 'user1'@'domain-or-ip-address1'",
},
dumpState: StateFailure,
checkTables: map[string][]string{
"db1": {"anomaly_score"},
checkTables: []*filter.Table{
{Schema: "db1", Name: "anomaly_score"},
},
// `db1`.`anomaly_score`; `INFORMATION_SCHEMA`
// can't guarantee the order
Expand Down Expand Up @@ -126,8 +127,8 @@ func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
"GRANT ALL PRIVILEGES ON `medz`.* TO `zhangsan`@`10.8.1.9` WITH GRANT OPTION",
},
dumpState: StateFailure,
checkTables: map[string][]string{
"medz": {"medz"},
checkTables: []*filter.Table{
{Schema: "medz", Name: "medz"},
},
errMatch: "lack of RELOAD privilege; ",
},
Expand All @@ -137,8 +138,8 @@ func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
"GRANT ALL PRIVILEGES ON `INFORMATION_SCHEMA`.* TO `zhangsan`@`10.8.1.9` WITH GRANT OPTION",
},
dumpState: StateFailure,
checkTables: map[string][]string{
"medz": {"medz"},
checkTables: []*filter.Table{
{Schema: "medz", Name: "medz"},
},
errMatch: "lack of RELOAD privilege; ",
},
Expand All @@ -149,8 +150,8 @@ func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
"GRANT SELECT ON `INFORMATION_SCHEMA`.* TO 'user'@'%'",
},
dumpState: StateFailure,
checkTables: map[string][]string{
"lance": {"t"},
checkTables: []*filter.Table{
{Schema: "lance", Name: "t"},
},
errMatch: "lack of Select privilege: {`lance`.`t`}; ",
},
Expand All @@ -162,8 +163,8 @@ func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
"GRANT `r1`@`%`,`r2`@`%` TO `u1`@`localhost`",
},
dumpState: StateSuccess,
checkTables: map[string][]string{
"db1": {"t"},
checkTables: []*filter.Table{
{Schema: "db1", Name: "t"},
},
},
{
Expand Down Expand Up @@ -195,7 +196,7 @@ func (t *testCheckSuite) TestVerifyDumpPrivileges(c *tc.C) {
func (t *testCheckSuite) TestVerifyReplicationPrivileges(c *tc.C) {
cases := []struct {
grants []string
checkTables map[string][]string
checkTables []*filter.Table
replicationState State
errMatch string
}{
Expand Down
Loading