Skip to content

Commit

Permalink
lightning: add PK to internal tables (#57912)
Browse files Browse the repository at this point in the history
close #57479
  • Loading branch information
lance6716 authored Dec 4, 2024
1 parent 043986d commit 28585c8
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 35 deletions.
3 changes: 3 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool
// in order to find the correct region.
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 1; i < 100; i++ {
if err := ctx.Err(); err != nil {
return nil, err
}
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,35 +491,35 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) {
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(5)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "11.csv", int64(0), nonRetryableError.Error(), "(5)").
WillReturnResult(driver.ResultNoRows)

Expand Down Expand Up @@ -557,21 +557,21 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v2.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ const (
// DupeResAlgNone doesn't detect duplicate.
DupeResAlgNone DuplicateResolutionAlgorithm = iota

// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB.
// DupeResAlgRecord only records duplicate records to `lightning_task_info.conflict_error_v1_2` table on the target TiDB.
DupeResAlgRecord

// DupeResAlgRemove records all duplicate records like the 'record' algorithm and remove all information related to the
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1 table to add back the correct rows.
// duplicated rows. Users need to analyze the lightning_task_info.conflict_error_v1_2 table to add back the correct rows.
DupeResAlgRemove

// DupeResAlgErr reports an error and stops the import process.
Expand Down
15 changes: 9 additions & 6 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ const (
CREATE SCHEMA IF NOT EXISTS %s;
`

syntaxErrorTableName = "syntax_error_v1"
typeErrorTableName = "type_error_v1"
syntaxErrorTableName = "syntax_error_v2"
typeErrorTableName = "type_error_v2"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"
ConflictErrorTableName = "conflict_error_v1_2"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
id bigint PRIMARY KEY AUTO_INCREMENT,
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -59,6 +60,7 @@ const (

createTypeErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + typeErrorTableName + ` (
id bigint PRIMARY KEY AUTO_INCREMENT,
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -71,6 +73,7 @@ const (

createConflictErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + ConflictErrorTableName + ` (
id bigint PRIMARY KEY AUTO_INCREMENT,
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand Down Expand Up @@ -108,10 +111,10 @@ const (
sqlValuesConflictErrorIndex = "(?,?,?,?,?,?,?,?,?)"

selectConflictKeys = `
SELECT _tidb_rowid, raw_handle, raw_row
SELECT id, raw_handle, raw_row
FROM %s.` + ConflictErrorTableName + `
WHERE table_name = ? AND _tidb_rowid >= ? and _tidb_rowid < ?
ORDER BY _tidb_rowid LIMIT ?;
WHERE table_name = ? AND id >= ? and id < ?
ORDER BY id LIMIT ?;
`
)

Expand Down
20 changes: 10 additions & 10 deletions br/pkg/lightning/errormanager/errormanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestInit(t *testing.T) {
em.dupResolution = config.DupeResAlgRecord
mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*").
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1_2.*").
WillReturnResult(sqlmock.NewResult(2, 1))
err = em.Init(ctx)
require.NoError(t, err)
Expand All @@ -64,17 +64,17 @@ func TestInit(t *testing.T) {
em.remainingError.Type.Store(1)
mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`;").
WillReturnResult(sqlmock.NewResult(3, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*").
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v2.*").
WillReturnResult(sqlmock.NewResult(4, 1))
err = em.Init(ctx)
require.NoError(t, err)
em.dupResolution = config.DupeResAlgRecord
em.remainingError.Type.Store(1)
mock.ExpectExec("CREATE SCHEMA IF NOT EXISTS `lightning_errors`.*").
WillReturnResult(sqlmock.NewResult(5, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v1.*").
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.type_error_v2.*").
WillReturnResult(sqlmock.NewResult(6, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*").
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1_2.*").
WillReturnResult(sqlmock.NewResult(7, 1))
err = em.Init(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -111,7 +111,7 @@ type mockRows struct {
}

func (r *mockRows) Columns() []string {
return []string{"_tidb_rowid", "raw_handle", "raw_row"}
return []string{"id", "raw_handle", "raw_row"}
}

func (r *mockRows) Close() error { return nil }
Expand All @@ -120,15 +120,15 @@ func (r *mockRows) Next(dest []driver.Value) error {
if r.start >= r.end {
return io.EOF
}
dest[0] = r.start // _tidb_rowid
dest[0] = r.start // id
dest[1] = []byte{} // raw_handle
dest[2] = []byte{} // raw_row
r.start++
return nil
}

func (c mockConn) QueryContext(_ context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
expectedQuery := "SELECT _tidb_rowid, raw_handle, raw_row.*"
expectedQuery := "SELECT id, raw_handle, raw_row.*"
if err := sqlmock.QueryMatcherRegexp.Match(expectedQuery, query); err != nil {
return &mockRows{}, nil
}
Expand Down Expand Up @@ -241,15 +241,15 @@ func TestErrorMgrErrorOutput(t *testing.T) {
em.remainingError.Syntax.Sub(1)
output = em.Output()
checkStr := strings.ReplaceAll(output, "\n", "")
expected := "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|+---+-------------+-------------+--------------------------------+"
expected := "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 1 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|+---+-------------+-------------+--------------------------------+"
require.Equal(t, expected, checkStr)

em.remainingError = cfg.App.MaxError
em.remainingError.Syntax.Sub(10)
em.remainingError.Type.Store(10)
output = em.Output()
checkStr = strings.ReplaceAll(output, "\n", "")
expected = "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m|+---+-------------+-------------+--------------------------------+"
expected = "Import Data Error Summary: +---+-------------+-------------+--------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+-------------+-------------+--------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 90 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 10 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m|+---+-------------+-------------+--------------------------------+"
require.Equal(t, expected, checkStr)

// change multiple keys
Expand All @@ -260,6 +260,6 @@ func TestErrorMgrErrorOutput(t *testing.T) {
em.remainingError.Conflict.Store(0)
output = em.Output()
checkStr = strings.ReplaceAll(output, "\n", "")
expected = "Import Data Error Summary: +---+---------------------+-------------+----------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+---------------------+-------------+----------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v1` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v1` \x1b[0m||\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m||\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v1` \x1b[0m|+---+---------------------+-------------+----------------------------------+"
expected = "Import Data Error Summary: +---+---------------------+-------------+------------------------------------+| # | ERROR TYPE | ERROR COUNT | ERROR DATA TABLE |+---+---------------------+-------------+------------------------------------+|\x1b[31m 1 \x1b[0m|\x1b[31m Data Type \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`type_error_v2` \x1b[0m||\x1b[31m 2 \x1b[0m|\x1b[31m Data Syntax \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`syntax_error_v2` \x1b[0m||\x1b[31m 3 \x1b[0m|\x1b[31m Charset Error \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m \x1b[0m||\x1b[31m 4 \x1b[0m|\x1b[31m Unique Key Conflict \x1b[0m|\x1b[31m 100 \x1b[0m|\x1b[31m `error_info`.`conflict_error_v1_2` \x1b[0m|+---+---------------------+-------------+------------------------------------+"
require.Equal(t, expected, checkStr)
}
2 changes: 1 addition & 1 deletion br/tests/lightning_duplicate_detection/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ verify_detected_rows() {
done
done
mapfile -t expect_rows < <(for row in "${expect_rows[@]}"; do echo "$row"; done | sort | uniq)
mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v1 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" |
mapfile -t actual_rows < <(run_sql "SELECT row_data FROM lightning_task_info.conflict_error_v1_2 WHERE table_name = \"\`dup_detect\`.\`${table}\`\"" |
grep "row_data:" | sed 's/^.*(//' | sed 's/).*$//' | sed 's/"//g' | sed 's/, */,/g' | sort | uniq)
equal=0
if [ "${#actual_rows[@]}" = "${#expect_rows[@]}" ]; then
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_issue_40657/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ run_lightning -d "tests/$TEST_NAME/data1"
run_sql 'admin check table test.t'
run_sql 'select count(*) from test.t'
check_contains 'count(*): 3'
run_sql 'select count(*) from lightning_task_info.conflict_error_v1'
run_sql 'select count(*) from lightning_task_info.conflict_error_v1_2'
check_contains 'count(*): 2'

run_sql 'truncate table test.t'
Expand Down
10 changes: 5 additions & 5 deletions br/tests/lightning_sqlmode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,28 @@ run_sql 'SELECT min(id), max(id) FROM sqlmodedb.t'
check_contains 'min(id): 4'
check_contains 'max(id): 4'

run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v1'
run_sql 'SELECT count(*) FROM sqlmodedb_lightning_task_info.type_error_v2'
check_contains 'count(*): 4'

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(1,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 53'
check_contains 'cannot convert datum from unsigned bigint to type timestamp.'
check_contains "row_data: (1,9,128,'too long','x,y,z')"

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(2,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 100'
check_contains "Incorrect timestamp value: '2000-00-00 00:00:00'"
check_contains "row_data: (2,'2000-00-00 00:00:00',-99999,'🤩',3)"

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(3,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 149'
check_contains "Incorrect timestamp value: '9999-12-31 23:59:59'"
check_contains "row_data: (3,'9999-12-31 23:59:59','NaN',x'99','x+y')"

run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v1 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";'
run_sql 'SELECT path, `offset`, error, row_data FROM sqlmodedb_lightning_task_info.type_error_v2 WHERE table_name = "`sqlmodedb`.`t`" AND row_data LIKE "(5,%";'
check_contains 'path: sqlmodedb.t.1.sql'
check_contains 'offset: 237'
check_contains "Column 'a' cannot be null"
Expand Down
2 changes: 1 addition & 1 deletion br/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ addr = "127.0.0.1:8287"
# Current supports three resolution algorithms:
# - none: doesn't detect duplicate records, which has the best performance of the three algorithms, but probably leads to
# inconsistent data in the target TiDB.
# - record: only records duplicate records to `lightning_task_info.conflict_error_v1` table on the target TiDB. Note that this
# - record: only records duplicate records to `lightning_task_info.conflict_error_v1_2` table on the target TiDB. Note that this
# required the version of target TiKV version is no less than v5.2.0, otherwise it will fallback to 'none'.
# - remove: records all duplicate records like the 'record' algorithm and remove all duplicate records to ensure a consistent
# state in the target TiDB.
Expand Down
2 changes: 1 addition & 1 deletion tools/check/check-bazel-prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# -o pipefail: sets the exit code of a pipeline to that of the rightmost command to exit with a non-zero status,
# or to zero if all commands of the pipeline exit successfully.
set -euo pipefail

rm -rf /home/jenkins/.cache/bazel/_bazel_jenkins/install/a09dbb90c658248f08f9aa0eba11997d
before_checksum=`find . -type f \( -name '*.bazel' -o -name '*.bzl' \) -exec md5sum {} \;| sort -k 2`
make bazel_prepare
after_checksum=`find . -type f \( -name '*.bazel' -o -name '*.bzl' \) -exec md5sum {} \;| sort -k 2`
Expand Down

0 comments on commit 28585c8

Please sign in to comment.