From ccb6b3edc5851b4916e66310c614afb0955aea76 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 6 Aug 2024 14:01:10 +0800 Subject: [PATCH 1/6] change --- lightning/pkg/importer/import.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 04218f2f1102e..9ce60be42f007 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -503,7 +503,7 @@ func NewImportControllerWithPauser( errorSummaries: makeErrorSummaries(log.FromContext(ctx)), checkpointsDB: cpdb, - saveCpCh: make(chan saveCp), + saveCpCh: make(chan saveCp, cfg.App.RegionConcurrency), closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), // Currently, TiDB add index acceration doesn't support multiple tables simultaneously. // So we use a single worker to ensure at most one table is adding index at the same time. @@ -877,8 +877,7 @@ func (rc *Controller) listenCheckpointUpdates(logger log.Logger) { } }() - for scp := range rc.saveCpCh { - lock.Lock() + coalesceSaveRequests := func(scp saveCp) { cpd, ok := coalesed[scp.tableName] if !ok { cpd = checkpoints.NewTableCheckpointDiff() @@ -888,14 +887,25 @@ func (rc *Controller) listenCheckpointUpdates(logger log.Logger) { if scp.waitCh != nil { waiters = append(waiters, scp.waitCh) } + } + for scp := range rc.saveCpCh { + lock.Lock() + coalesceSaveRequests(scp) + for i := 0; i < len(rc.saveCpCh); i++ { + select { + case scp := <-rc.saveCpCh: + coalesceSaveRequests(scp) + default: + break + } + } + lock.Unlock() if len(hasCheckpoint) == 0 { rc.checkpointsWg.Add(1) hasCheckpoint <- struct{}{} } - lock.Unlock() - //nolint:scopelint // This would be either INLINED or ERASED, at compile time. failpoint.Inject("FailIfImportedChunk", func() { if merger, ok := scp.merger.(*checkpoints.ChunkCheckpointMerger); ok && merger.Pos >= merger.EndOffset { From eafed129e8d43664a90cf5dfa7b5a1a23ffd8b39 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 6 Aug 2024 15:13:58 +0800 Subject: [PATCH 2/6] Revert "change" This reverts commit ccb6b3edc5851b4916e66310c614afb0955aea76. --- lightning/pkg/importer/import.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 9ce60be42f007..04218f2f1102e 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -503,7 +503,7 @@ func NewImportControllerWithPauser( errorSummaries: makeErrorSummaries(log.FromContext(ctx)), checkpointsDB: cpdb, - saveCpCh: make(chan saveCp, cfg.App.RegionConcurrency), + saveCpCh: make(chan saveCp), closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), // Currently, TiDB add index acceration doesn't support multiple tables simultaneously. // So we use a single worker to ensure at most one table is adding index at the same time. @@ -877,7 +877,8 @@ func (rc *Controller) listenCheckpointUpdates(logger log.Logger) { } }() - coalesceSaveRequests := func(scp saveCp) { + for scp := range rc.saveCpCh { + lock.Lock() cpd, ok := coalesed[scp.tableName] if !ok { cpd = checkpoints.NewTableCheckpointDiff() @@ -887,25 +888,14 @@ func (rc *Controller) listenCheckpointUpdates(logger log.Logger) { if scp.waitCh != nil { waiters = append(waiters, scp.waitCh) } - } - for scp := range rc.saveCpCh { - lock.Lock() - coalesceSaveRequests(scp) - for i := 0; i < len(rc.saveCpCh); i++ { - select { - case scp := <-rc.saveCpCh: - coalesceSaveRequests(scp) - default: - break - } - } - lock.Unlock() if len(hasCheckpoint) == 0 { rc.checkpointsWg.Add(1) hasCheckpoint <- struct{}{} } + lock.Unlock() + //nolint:scopelint // This would be either INLINED or ERASED, at compile time. failpoint.Inject("FailIfImportedChunk", func() { if merger, ok := scp.merger.(*checkpoints.ChunkCheckpointMerger); ok && merger.Pos >= merger.EndOffset { From 3975db86c0c9b3e17abd0e9d80e63e7a00f9a567 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 6 Aug 2024 16:23:29 +0800 Subject: [PATCH 3/6] change --- lightning/pkg/importer/import.go | 6 ++-- pkg/lightning/checkpoints/checkpoints.go | 37 +++++++++++++++++------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 04218f2f1102e..dfb441afdc8dc 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -640,8 +640,10 @@ func (rc *Controller) initCheckpoint(ctx context.Context) error { log.FromContext(ctx).Warn("exit triggered", zap.String("failpoint", "InitializeCheckpointExit")) os.Exit(0) }) - if err := rc.loadDesiredTableInfos(ctx); err != nil { - return err + if rc.cfg.TikvImporter.AddIndexBySQL { + if err := rc.loadDesiredTableInfos(ctx); err != nil { + return err + } } rc.checkpointsWg.Add(1) // checkpointsWg will be done in `rc.listenCheckpointUpdates` diff --git a/pkg/lightning/checkpoints/checkpoints.go b/pkg/lightning/checkpoints/checkpoints.go index 186ee9b90ffe3..fe5e4168165a9 100644 --- a/pkg/lightning/checkpoints/checkpoints.go +++ b/pkg/lightning/checkpoints/checkpoints.go @@ -800,9 +800,13 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Conf for _, db := range dbInfo { for _, table := range db.Tables { tableName := common.UniqueTable(db.Name, table.Name) - tableInfo, err := json.Marshal(table.Desired) - if err != nil { - return errors.Trace(err) + var tableInfo []byte + if cfg.TikvImporter.AddIndexBySQL { + // see comments in FileCheckpointsDB.Initialize + tableInfo, err = json.Marshal(table.Desired) + if err != nil { + return errors.Trace(err) + } } _, err = stmt.ExecContext(c, cfg.TaskID, tableName, CheckpointStatusLoaded, table.ID, tableInfo) if err != nil { @@ -933,8 +937,10 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab return errors.NotFoundf("checkpoint for table %s", tableName) } } - if err := json.Unmarshal(rawTableInfo, &cp.TableInfo); err != nil { - return errors.Trace(err) + if len(rawTableInfo) > 0 { + if err := json.Unmarshal(rawTableInfo, &cp.TableInfo); err != nil { + return errors.Trace(err) + } } cp.Checksum = verify.MakeKVChecksum(bytes, kvs, checksum) cp.Status = CheckpointStatus(status) @@ -1246,13 +1252,22 @@ func (cpdb *FileCheckpointsDB) Initialize(_ context.Context, cfg *config.Config, cpdb.checkpoints.Checkpoints = make(map[string]*checkpointspb.TableCheckpointModel) } + var err error for _, db := range dbInfo { for _, table := range db.Tables { tableName := common.UniqueTable(db.Name, table.Name) if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok { - tableInfo, err := json.Marshal(table.Desired) - if err != nil { - return errors.Trace(err) + var tableInfo []byte + if cfg.TikvImporter.AddIndexBySQL { + // tableInfo is quite large in most case, when importing many + // small tables, writing tableInfo will make checkpoint file + // vary large, and save checkpoint will become bottleneck, so + // we only store table info if we are going to add index by SQL + // where it's required. + tableInfo, err = json.Marshal(table.Desired) + if err != nil { + return errors.Trace(err) + } } cpdb.checkpoints.Checkpoints[tableName] = &checkpointspb.TableCheckpointModel{ Status: uint32(CheckpointStatusLoaded), @@ -1308,8 +1323,10 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC } var tableInfo *model.TableInfo - if err := json.Unmarshal(tableModel.TableInfo, &tableInfo); err != nil { - return nil, errors.Trace(err) + if len(tableModel.TableInfo) > 0 { + if err := json.Unmarshal(tableModel.TableInfo, &tableInfo); err != nil { + return nil, errors.Trace(err) + } } cp := &TableCheckpoint{ From d1a8fd359b5d509c1c7faa414efc1a69413d6686 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 6 Aug 2024 17:01:30 +0800 Subject: [PATCH 4/6] ut --- .../checkpoints/checkpoints_file_test.go | 71 ++++++++---- .../checkpoints/checkpoints_sql_test.go | 109 ++++++++++++++++++ 2 files changed, 158 insertions(+), 22 deletions(-) diff --git a/pkg/lightning/checkpoints/checkpoints_file_test.go b/pkg/lightning/checkpoints/checkpoints_file_test.go index df2b84f50a3e4..7213dc9aa3e09 100644 --- a/pkg/lightning/checkpoints/checkpoints_file_test.go +++ b/pkg/lightning/checkpoints/checkpoints_file_test.go @@ -41,7 +41,7 @@ func newTestConfig() *config.Config { return cfg } -func newFileCheckpointsDB(t *testing.T) *checkpoints.FileCheckpointsDB { +func newFileCheckpointsDB(t *testing.T, addIndexBySQL bool) *checkpoints.FileCheckpointsDB { dir := t.TempDir() ctx := context.Background() cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, filepath.Join(dir, "cp.pb")) @@ -49,6 +49,7 @@ func newFileCheckpointsDB(t *testing.T) *checkpoints.FileCheckpointsDB { // 2. initialize with checkpoint data. cfg := newTestConfig() + cfg.TikvImporter.AddIndexBySQL = addIndexBySQL err = cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{ "db1": { Name: "db1", @@ -164,13 +165,7 @@ func setInvalidStatus(cpdb *checkpoints.FileCheckpointsDB) { func TestGet(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) - - // 5. get back the checkpoints - - cp, err := cpdb.Get(ctx, "`db1`.`t2`") - require.NoError(t, err) - expect := &checkpoints.TableCheckpoint{ + expectT2 := &checkpoints.TableCheckpoint{ Status: checkpoints.CheckpointStatusAllWritten, AllocBase: 132861, Checksum: verification.MakeKVChecksum(4492, 686, 486070148910), @@ -203,11 +198,18 @@ func TestGet(t *testing.T) { }, }, } - require.Equal(t, expect, cp) - cp, err = cpdb.Get(ctx, "`db2`.`t3`") - require.NoError(t, err) - expect = &checkpoints.TableCheckpoint{ + expectT3 := &checkpoints.TableCheckpoint{ + Status: checkpoints.CheckpointStatusLoaded, + Engines: map[int32]*checkpoints.EngineCheckpoint{ + -1: { + Status: checkpoints.CheckpointStatusLoaded, + Chunks: []*checkpoints.ChunkCheckpoint{}, + }, + }, + } + + expectT3AddIndexBySQL := &checkpoints.TableCheckpoint{ Status: checkpoints.CheckpointStatusLoaded, Engines: map[int32]*checkpoints.EngineCheckpoint{ -1: { @@ -219,16 +221,41 @@ func TestGet(t *testing.T) { Name: model.NewCIStr("t3"), }, } - require.Equal(t, expect, cp) - cp, err = cpdb.Get(ctx, "`db3`.`not-exists`") - require.Nil(t, cp) - require.True(t, errors.IsNotFound(err)) + t.Run("normal", func(t *testing.T) { + cpdb := newFileCheckpointsDB(t, false) + cp, err := cpdb.Get(ctx, "`db1`.`t2`") + require.NoError(t, err) + require.Equal(t, expectT2, cp) + + cp, err = cpdb.Get(ctx, "`db2`.`t3`") + require.NoError(t, err) + require.Equal(t, expectT3, cp) + + cp, err = cpdb.Get(ctx, "`db3`.`not-exists`") + require.Nil(t, cp) + require.True(t, errors.IsNotFound(err)) + }) + + t.Run("add-index-by-sql", func(t *testing.T) { + cpdb := newFileCheckpointsDB(t, true) + cp, err := cpdb.Get(ctx, "`db1`.`t2`") + require.NoError(t, err) + require.Equal(t, expectT2, cp) + + cp, err = cpdb.Get(ctx, "`db2`.`t3`") + require.NoError(t, err) + require.Equal(t, expectT3AddIndexBySQL, cp) + + cp, err = cpdb.Get(ctx, "`db3`.`not-exists`") + require.Nil(t, cp) + require.True(t, errors.IsNotFound(err)) + }) } func TestRemoveAllCheckpoints(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) + cpdb := newFileCheckpointsDB(t, false) err := cpdb.RemoveCheckpoint(ctx, "all") require.NoError(t, err) @@ -244,7 +271,7 @@ func TestRemoveAllCheckpoints(t *testing.T) { func TestRemoveOneCheckpoint(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) + cpdb := newFileCheckpointsDB(t, false) err := cpdb.RemoveCheckpoint(ctx, "`db1`.`t2`") require.NoError(t, err) @@ -260,7 +287,7 @@ func TestRemoveOneCheckpoint(t *testing.T) { func TestIgnoreAllErrorCheckpoints(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) + cpdb := newFileCheckpointsDB(t, false) setInvalidStatus(cpdb) @@ -278,7 +305,7 @@ func TestIgnoreAllErrorCheckpoints(t *testing.T) { func TestIgnoreOneErrorCheckpoints(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) + cpdb := newFileCheckpointsDB(t, false) setInvalidStatus(cpdb) @@ -296,7 +323,7 @@ func TestIgnoreOneErrorCheckpoints(t *testing.T) { func TestDestroyAllErrorCheckpoints(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) + cpdb := newFileCheckpointsDB(t, false) setInvalidStatus(cpdb) @@ -328,7 +355,7 @@ func TestDestroyAllErrorCheckpoints(t *testing.T) { func TestDestroyOneErrorCheckpoint(t *testing.T) { ctx := context.Background() - cpdb := newFileCheckpointsDB(t) + cpdb := newFileCheckpointsDB(t, false) setInvalidStatus(cpdb) diff --git a/pkg/lightning/checkpoints/checkpoints_sql_test.go b/pkg/lightning/checkpoints/checkpoints_sql_test.go index cbd6b812ba8d6..6b53ff203bbf0 100644 --- a/pkg/lightning/checkpoints/checkpoints_sql_test.go +++ b/pkg/lightning/checkpoints/checkpoints_sql_test.go @@ -78,6 +78,114 @@ func TestNormalOperations(t *testing.T) { ctx := context.Background() s := newCPSQLSuite(t) cpdb := s.cpdb + s.mock.ExpectBegin() + initializeStmt := s.mock.ExpectPrepare( + "REPLACE INTO `mock-schema`\\.`task_v\\d+`") + initializeStmt.ExpectExec(). + WithArgs(123, "/data", "local", "127.0.0.1:8287", "127.0.0.1", 4000, "127.0.0.1:2379", "/tmp/sorted-kv", build.ReleaseVersion). + WillReturnResult(sqlmock.NewResult(6, 1)) + initializeStmt = s.mock. + ExpectPrepare("INSERT INTO `mock-schema`\\.`table_v\\d+`") + initializeStmt.ExpectExec(). + WithArgs(123, "`db1`.`t2`", sqlmock.AnyArg(), int64(2), []byte(nil)). + WillReturnResult(sqlmock.NewResult(8, 1)) + s.mock.ExpectCommit() + + s.mock.MatchExpectationsInOrder(false) + cfg := newTestConfig() + err := cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{ + "db1": { + Name: "db1", + Tables: map[string]*checkpoints.TidbTableInfo{ + "t2": { + Name: "t2", + ID: 2, + Desired: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + }, + }, + }, + }) + s.mock.MatchExpectationsInOrder(true) + require.NoError(t, err) + require.Nil(t, s.mock.ExpectationsWereMet()) + + s.mock.ExpectBegin() + s.mock. + ExpectQuery("SELECT .+ FROM `mock-schema`\\.`engine_v\\d+`"). + WithArgs("`db1`.`t2`"). + WillReturnRows( + sqlmock.NewRows([]string{"engine_id", "status"}). + AddRow(0, 120). + AddRow(-1, 30), + ) + s.mock. + ExpectQuery("SELECT (?s:.+) FROM `mock-schema`\\.`chunk_v\\d+`"). + WithArgs("`db1`.`t2`"). + WillReturnRows( + sqlmock.NewRows([]string{ + "engine_id", "path", "offset", "type", "compression", "sort_key", "file_size", "columns", + "pos", "end_offset", "prev_rowid_max", "rowid_max", + "kvc_bytes", "kvc_kvs", "kvc_checksum", "unix_timestamp(create_time)", + }). + AddRow( + 0, "/tmp/path/1.sql", 0, mydump.SourceTypeSQL, 0, "", 123, "[]", + 55904, 102400, 681, 5000, + 4491, 586, 486070148917, 1234567894, + ), + ) + s.mock. + ExpectQuery("SELECT .+ FROM `mock-schema`\\.`table_v\\d+`"). + WithArgs("`db1`.`t2`"). + WillReturnRows( + sqlmock.NewRows([]string{"status", "alloc_base", "table_id", "table_info", "kv_bytes", "kv_kvs", "kv_checksum"}). + AddRow(60, 132861, int64(2), nil, uint64(4492), uint64(686), uint64(486070148910)), + ) + s.mock.ExpectCommit() + + cp, err := cpdb.Get(ctx, "`db1`.`t2`") + require.Nil(t, err) + require.Equal(t, &checkpoints.TableCheckpoint{ + Status: checkpoints.CheckpointStatusAllWritten, + AllocBase: 132861, + TableID: int64(2), + TableInfo: nil, + Engines: map[int32]*checkpoints.EngineCheckpoint{ + -1: {Status: checkpoints.CheckpointStatusLoaded}, + 0: { + Status: checkpoints.CheckpointStatusImported, + Chunks: []*checkpoints.ChunkCheckpoint{{ + Key: checkpoints.ChunkCheckpointKey{ + Path: "/tmp/path/1.sql", + Offset: 0, + }, + FileMeta: mydump.SourceFileMeta{ + Path: "/tmp/path/1.sql", + Type: mydump.SourceTypeSQL, + FileSize: 123, + }, + ColumnPermutation: []int{}, + Chunk: mydump.Chunk{ + Offset: 55904, + EndOffset: 102400, + PrevRowIDMax: 681, + RowIDMax: 5000, + }, + Checksum: verification.MakeKVChecksum(4491, 586, 486070148917), + Timestamp: 1234567894, + }}, + }, + }, + Checksum: verification.MakeKVChecksum(4492, 686, 486070148910), + }, cp) + require.Nil(t, s.mock.ExpectationsWereMet()) +} + +func TestNormalOperationsWithAddIndexBySQL(t *testing.T) { + ctx := context.Background() + s := newCPSQLSuite(t) + cpdb := s.cpdb // 2. initialize with checkpoint data. @@ -115,6 +223,7 @@ func TestNormalOperations(t *testing.T) { s.mock.MatchExpectationsInOrder(false) cfg := newTestConfig() + cfg.TikvImporter.AddIndexBySQL = true err = cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{ "db1": { Name: "db1", From a08c1dab3ceb8fe2d1943f74ed5deb8b0b9ca8b5 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 6 Aug 2024 17:39:53 +0800 Subject: [PATCH 5/6] i t --- .../lightning_add_index/config2-file.toml | 8 ++++ .../{config2.toml => config2-mysql.toml} | 0 lightning/tests/lightning_add_index/run.sh | 38 +++++++++++-------- pkg/lightning/checkpoints/checkpoints.go | 2 +- .../checkpoints/checkpoints_sql_test.go | 2 +- 5 files changed, 32 insertions(+), 18 deletions(-) create mode 100644 lightning/tests/lightning_add_index/config2-file.toml rename lightning/tests/lightning_add_index/{config2.toml => config2-mysql.toml} (100%) diff --git a/lightning/tests/lightning_add_index/config2-file.toml b/lightning/tests/lightning_add_index/config2-file.toml new file mode 100644 index 0000000000000..6c470b3cc531b --- /dev/null +++ b/lightning/tests/lightning_add_index/config2-file.toml @@ -0,0 +1,8 @@ +[checkpoint] +enable = true +driver = "file" +dsn = "/tmp/add-index-by-sql-checkpoint.pb" + +[tikv-importer] +backend = 'local' +add-index-by-sql = true diff --git a/lightning/tests/lightning_add_index/config2.toml b/lightning/tests/lightning_add_index/config2-mysql.toml similarity index 100% rename from lightning/tests/lightning_add_index/config2.toml rename to lightning/tests/lightning_add_index/config2-mysql.toml diff --git a/lightning/tests/lightning_add_index/run.sh b/lightning/tests/lightning_add_index/run.sh index d82b74b2a52b1..6f3fba046abeb 100644 --- a/lightning/tests/lightning_add_index/run.sh +++ b/lightning/tests/lightning_add_index/run.sh @@ -29,7 +29,7 @@ non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" non_pk_auto_inc_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Checksum_crc64_xor" | awk '{print $2}') run_sql "DROP DATABASE add_index;" -run_lightning --config "$CUR/config2.toml" --log-file "$LOG_FILE2" +run_lightning --config "$CUR/config2-mysql.toml" --log-file "$LOG_FILE2" actual_multi_indexes_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Total_kvs" | awk '{print $2}') actual_multi_indexes_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Checksum_crc64_xor" | awk '{print $2}') actual_non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Total_kvs" | awk '{print $2}') @@ -82,18 +82,24 @@ grep -Fq "ALTER TABLE \`add_index\`.\`non_pk_auto_inc\` DROP PRIMARY KEY" "$LOG_ grep -Fq "ALTER TABLE \`add_index\`.\`non_pk_auto_inc\` ADD PRIMARY KEY (\`pk\`)" "$LOG_FILE2" # 3. Check for recovering from checkpoint -export GO_FAILPOINTS="github.com/pingcap/tidb/lightning/pkg/importer/AddIndexCrash=return()" -run_sql "DROP DATABASE add_index;" -run_lightning --enable-checkpoint=1 --config "$CUR/config2.toml" --log-file "$LOG_FILE2" -grep -Fq "task canceled" "$LOG_FILE2" - -unset GO_FAILPOINTS -run_lightning --enable-checkpoint=1 --config "$CUR/config2.toml" --log-file "$LOG_FILE2" -actual_multi_indexes_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Total_kvs" | awk '{print $2}') -actual_multi_indexes_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Checksum_crc64_xor" | awk '{print $2}') -actual_non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Total_kvs" | awk '{print $2}') -actual_non_pk_auto_inc_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Checksum_crc64_xor" | awk '{print $2}') - -set -x -[ "$multi_indexes_kvs" == "$actual_multi_indexes_kvs" ] && [ "$multi_indexes_cksum" == "$actual_multi_indexes_cksum" ] -[ "$non_pk_auto_inc_kvs" == "$actual_non_pk_auto_inc_kvs" ] && [ "$non_pk_auto_inc_cksum" == "$actual_non_pk_auto_inc_cksum" ] +function recover_from_checkpoint() { + tp=$1 + export GO_FAILPOINTS="github.com/pingcap/tidb/lightning/pkg/importer/AddIndexCrash=return()" + run_sql "DROP DATABASE add_index;" + rm -rf /tmp/add-index-by-sql-checkpoint.pb + run_lightning --enable-checkpoint=1 --config "$CUR/config2-$tp.toml" --log-file "$LOG_FILE2" + grep -Fq "task canceled" "$LOG_FILE2" + + unset GO_FAILPOINTS + run_lightning --enable-checkpoint=1 --config "$CUR/config2-$tp.toml" --log-file "$LOG_FILE2" + actual_multi_indexes_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Total_kvs" | awk '{print $2}') + actual_multi_indexes_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Checksum_crc64_xor" | awk '{print $2}') + actual_non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Total_kvs" | awk '{print $2}') + actual_non_pk_auto_inc_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Checksum_crc64_xor" | awk '{print $2}') + + set -x + [ "$multi_indexes_kvs" == "$actual_multi_indexes_kvs" ] && [ "$multi_indexes_cksum" == "$actual_multi_indexes_cksum" ] + [ "$non_pk_auto_inc_kvs" == "$actual_non_pk_auto_inc_kvs" ] && [ "$non_pk_auto_inc_cksum" == "$actual_non_pk_auto_inc_cksum" ] +} +recover_from_checkpoint file +recover_from_checkpoint mysql diff --git a/pkg/lightning/checkpoints/checkpoints.go b/pkg/lightning/checkpoints/checkpoints.go index fe5e4168165a9..81497cc4dc5c0 100644 --- a/pkg/lightning/checkpoints/checkpoints.go +++ b/pkg/lightning/checkpoints/checkpoints.go @@ -800,7 +800,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Conf for _, db := range dbInfo { for _, table := range db.Tables { tableName := common.UniqueTable(db.Name, table.Name) - var tableInfo []byte + tableInfo := []byte("") if cfg.TikvImporter.AddIndexBySQL { // see comments in FileCheckpointsDB.Initialize tableInfo, err = json.Marshal(table.Desired) diff --git a/pkg/lightning/checkpoints/checkpoints_sql_test.go b/pkg/lightning/checkpoints/checkpoints_sql_test.go index 6b53ff203bbf0..e3e7081e768b0 100644 --- a/pkg/lightning/checkpoints/checkpoints_sql_test.go +++ b/pkg/lightning/checkpoints/checkpoints_sql_test.go @@ -87,7 +87,7 @@ func TestNormalOperations(t *testing.T) { initializeStmt = s.mock. ExpectPrepare("INSERT INTO `mock-schema`\\.`table_v\\d+`") initializeStmt.ExpectExec(). - WithArgs(123, "`db1`.`t2`", sqlmock.AnyArg(), int64(2), []byte(nil)). + WithArgs(123, "`db1`.`t2`", sqlmock.AnyArg(), int64(2), []byte("")). WillReturnResult(sqlmock.NewResult(8, 1)) s.mock.ExpectCommit() From 9e5609c47d07eecba288b6d0f12a1e12cfdb3960 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 6 Aug 2024 17:40:43 +0800 Subject: [PATCH 6/6] change --- pkg/lightning/checkpoints/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/checkpoints/BUILD.bazel b/pkg/lightning/checkpoints/BUILD.bazel index 90c521225490a..d7c7882ee2042 100644 --- a/pkg/lightning/checkpoints/BUILD.bazel +++ b/pkg/lightning/checkpoints/BUILD.bazel @@ -36,7 +36,7 @@ go_test( embed = [":checkpoints"], flaky = True, race = "on", - shard_count = 23, + shard_count = 24, deps = [ "//br/pkg/version/build", "//pkg/lightning/checkpoints/checkpointspb",