Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: optimize checkpoint size #55230

Merged
merged 6 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,10 @@ func (rc *Controller) initCheckpoint(ctx context.Context) error {
log.FromContext(ctx).Warn("exit triggered", zap.String("failpoint", "InitializeCheckpointExit"))
os.Exit(0)
})
if err := rc.loadDesiredTableInfos(ctx); err != nil {
return err
if rc.cfg.TikvImporter.AddIndexBySQL {
if err := rc.loadDesiredTableInfos(ctx); err != nil {
return err
}
}

rc.checkpointsWg.Add(1) // checkpointsWg will be done in `rc.listenCheckpointUpdates`
Expand Down
8 changes: 8 additions & 0 deletions lightning/tests/lightning_add_index/config2-file.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[checkpoint]
enable = true
driver = "file"
dsn = "/tmp/add-index-by-sql-checkpoint.pb"

[tikv-importer]
backend = 'local'
add-index-by-sql = true
38 changes: 22 additions & 16 deletions lightning/tests/lightning_add_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;"
non_pk_auto_inc_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Checksum_crc64_xor" | awk '{print $2}')

run_sql "DROP DATABASE add_index;"
run_lightning --config "$CUR/config2.toml" --log-file "$LOG_FILE2"
run_lightning --config "$CUR/config2-mysql.toml" --log-file "$LOG_FILE2"
actual_multi_indexes_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Total_kvs" | awk '{print $2}')
actual_multi_indexes_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Checksum_crc64_xor" | awk '{print $2}')
actual_non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Total_kvs" | awk '{print $2}')
Expand Down Expand Up @@ -82,18 +82,24 @@ grep -Fq "ALTER TABLE \`add_index\`.\`non_pk_auto_inc\` DROP PRIMARY KEY" "$LOG_
grep -Fq "ALTER TABLE \`add_index\`.\`non_pk_auto_inc\` ADD PRIMARY KEY (\`pk\`)" "$LOG_FILE2"

# 3. Check for recovering from checkpoint
export GO_FAILPOINTS="github.com/pingcap/tidb/lightning/pkg/importer/AddIndexCrash=return()"
run_sql "DROP DATABASE add_index;"
run_lightning --enable-checkpoint=1 --config "$CUR/config2.toml" --log-file "$LOG_FILE2"
grep -Fq "task canceled" "$LOG_FILE2"

unset GO_FAILPOINTS
run_lightning --enable-checkpoint=1 --config "$CUR/config2.toml" --log-file "$LOG_FILE2"
actual_multi_indexes_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Total_kvs" | awk '{print $2}')
actual_multi_indexes_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Checksum_crc64_xor" | awk '{print $2}')
actual_non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Total_kvs" | awk '{print $2}')
actual_non_pk_auto_inc_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Checksum_crc64_xor" | awk '{print $2}')

set -x
[ "$multi_indexes_kvs" == "$actual_multi_indexes_kvs" ] && [ "$multi_indexes_cksum" == "$actual_multi_indexes_cksum" ]
[ "$non_pk_auto_inc_kvs" == "$actual_non_pk_auto_inc_kvs" ] && [ "$non_pk_auto_inc_cksum" == "$actual_non_pk_auto_inc_cksum" ]
function recover_from_checkpoint() {
tp=$1
export GO_FAILPOINTS="github.com/pingcap/tidb/lightning/pkg/importer/AddIndexCrash=return()"
run_sql "DROP DATABASE add_index;"
rm -rf /tmp/add-index-by-sql-checkpoint.pb
run_lightning --enable-checkpoint=1 --config "$CUR/config2-$tp.toml" --log-file "$LOG_FILE2"
grep -Fq "task canceled" "$LOG_FILE2"

unset GO_FAILPOINTS
run_lightning --enable-checkpoint=1 --config "$CUR/config2-$tp.toml" --log-file "$LOG_FILE2"
actual_multi_indexes_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Total_kvs" | awk '{print $2}')
actual_multi_indexes_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.multi_indexes;" | grep "Checksum_crc64_xor" | awk '{print $2}')
actual_non_pk_auto_inc_kvs=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Total_kvs" | awk '{print $2}')
actual_non_pk_auto_inc_cksum=$(run_sql "ADMIN CHECKSUM TABLE add_index.non_pk_auto_inc;" | grep "Checksum_crc64_xor" | awk '{print $2}')

set -x
[ "$multi_indexes_kvs" == "$actual_multi_indexes_kvs" ] && [ "$multi_indexes_cksum" == "$actual_multi_indexes_cksum" ]
[ "$non_pk_auto_inc_kvs" == "$actual_non_pk_auto_inc_kvs" ] && [ "$non_pk_auto_inc_cksum" == "$actual_non_pk_auto_inc_cksum" ]
}
recover_from_checkpoint file
recover_from_checkpoint mysql
2 changes: 1 addition & 1 deletion pkg/lightning/checkpoints/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ go_test(
embed = [":checkpoints"],
flaky = True,
race = "on",
shard_count = 23,
shard_count = 24,
deps = [
"//br/pkg/version/build",
"//pkg/lightning/checkpoints/checkpointspb",
Expand Down
37 changes: 27 additions & 10 deletions pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,13 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Conf
for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
tableInfo, err := json.Marshal(table.Desired)
if err != nil {
return errors.Trace(err)
tableInfo := []byte("")
if cfg.TikvImporter.AddIndexBySQL {
// see comments in FileCheckpointsDB.Initialize
tableInfo, err = json.Marshal(table.Desired)
if err != nil {
return errors.Trace(err)
}
}
_, err = stmt.ExecContext(c, cfg.TaskID, tableName, CheckpointStatusLoaded, table.ID, tableInfo)
if err != nil {
Expand Down Expand Up @@ -933,8 +937,10 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
return errors.NotFoundf("checkpoint for table %s", tableName)
}
}
if err := json.Unmarshal(rawTableInfo, &cp.TableInfo); err != nil {
return errors.Trace(err)
if len(rawTableInfo) > 0 {
if err := json.Unmarshal(rawTableInfo, &cp.TableInfo); err != nil {
return errors.Trace(err)
}
}
cp.Checksum = verify.MakeKVChecksum(bytes, kvs, checksum)
cp.Status = CheckpointStatus(status)
Expand Down Expand Up @@ -1246,13 +1252,22 @@ func (cpdb *FileCheckpointsDB) Initialize(_ context.Context, cfg *config.Config,
cpdb.checkpoints.Checkpoints = make(map[string]*checkpointspb.TableCheckpointModel)
}

var err error
for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok {
tableInfo, err := json.Marshal(table.Desired)
if err != nil {
return errors.Trace(err)
var tableInfo []byte
if cfg.TikvImporter.AddIndexBySQL {
// tableInfo is quite large in most case, when importing many
// small tables, writing tableInfo will make checkpoint file
// vary large, and save checkpoint will become bottleneck, so
// we only store table info if we are going to add index by SQL
// where it's required.
tableInfo, err = json.Marshal(table.Desired)
if err != nil {
return errors.Trace(err)
}
}
cpdb.checkpoints.Checkpoints[tableName] = &checkpointspb.TableCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Expand Down Expand Up @@ -1308,8 +1323,10 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC
}

var tableInfo *model.TableInfo
if err := json.Unmarshal(tableModel.TableInfo, &tableInfo); err != nil {
return nil, errors.Trace(err)
if len(tableModel.TableInfo) > 0 {
if err := json.Unmarshal(tableModel.TableInfo, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
}

cp := &TableCheckpoint{
Expand Down
71 changes: 49 additions & 22 deletions pkg/lightning/checkpoints/checkpoints_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ func newTestConfig() *config.Config {
return cfg
}

func newFileCheckpointsDB(t *testing.T) *checkpoints.FileCheckpointsDB {
func newFileCheckpointsDB(t *testing.T, addIndexBySQL bool) *checkpoints.FileCheckpointsDB {
dir := t.TempDir()
ctx := context.Background()
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, filepath.Join(dir, "cp.pb"))
require.NoError(t, err)

// 2. initialize with checkpoint data.
cfg := newTestConfig()
cfg.TikvImporter.AddIndexBySQL = addIndexBySQL
err = cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{
"db1": {
Name: "db1",
Expand Down Expand Up @@ -164,13 +165,7 @@ func setInvalidStatus(cpdb *checkpoints.FileCheckpointsDB) {

func TestGet(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)

// 5. get back the checkpoints

cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
expect := &checkpoints.TableCheckpoint{
expectT2 := &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusAllWritten,
AllocBase: 132861,
Checksum: verification.MakeKVChecksum(4492, 686, 486070148910),
Expand Down Expand Up @@ -203,11 +198,18 @@ func TestGet(t *testing.T) {
},
},
}
require.Equal(t, expect, cp)

cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
expect = &checkpoints.TableCheckpoint{
expectT3 := &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusLoaded,
Engines: map[int32]*checkpoints.EngineCheckpoint{
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Chunks: []*checkpoints.ChunkCheckpoint{},
},
},
}

expectT3AddIndexBySQL := &checkpoints.TableCheckpoint{
Status: checkpoints.CheckpointStatusLoaded,
Engines: map[int32]*checkpoints.EngineCheckpoint{
-1: {
Expand All @@ -219,16 +221,41 @@ func TestGet(t *testing.T) {
Name: model.NewCIStr("t3"),
},
}
require.Equal(t, expect, cp)

cp, err = cpdb.Get(ctx, "`db3`.`not-exists`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))
t.Run("normal", func(t *testing.T) {
cpdb := newFileCheckpointsDB(t, false)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
require.Equal(t, expectT2, cp)

cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, expectT3, cp)

cp, err = cpdb.Get(ctx, "`db3`.`not-exists`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))
})

t.Run("add-index-by-sql", func(t *testing.T) {
cpdb := newFileCheckpointsDB(t, true)
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
require.Equal(t, expectT2, cp)

cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, expectT3AddIndexBySQL, cp)

cp, err = cpdb.Get(ctx, "`db3`.`not-exists`")
require.Nil(t, cp)
require.True(t, errors.IsNotFound(err))
})
}

func TestRemoveAllCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)
cpdb := newFileCheckpointsDB(t, false)

err := cpdb.RemoveCheckpoint(ctx, "all")
require.NoError(t, err)
Expand All @@ -244,7 +271,7 @@ func TestRemoveAllCheckpoints(t *testing.T) {

func TestRemoveOneCheckpoint(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)
cpdb := newFileCheckpointsDB(t, false)

err := cpdb.RemoveCheckpoint(ctx, "`db1`.`t2`")
require.NoError(t, err)
Expand All @@ -260,7 +287,7 @@ func TestRemoveOneCheckpoint(t *testing.T) {

func TestIgnoreAllErrorCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)
cpdb := newFileCheckpointsDB(t, false)

setInvalidStatus(cpdb)

Expand All @@ -278,7 +305,7 @@ func TestIgnoreAllErrorCheckpoints(t *testing.T) {

func TestIgnoreOneErrorCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)
cpdb := newFileCheckpointsDB(t, false)

setInvalidStatus(cpdb)

Expand All @@ -296,7 +323,7 @@ func TestIgnoreOneErrorCheckpoints(t *testing.T) {

func TestDestroyAllErrorCheckpoints(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)
cpdb := newFileCheckpointsDB(t, false)

setInvalidStatus(cpdb)

Expand Down Expand Up @@ -328,7 +355,7 @@ func TestDestroyAllErrorCheckpoints(t *testing.T) {

func TestDestroyOneErrorCheckpoint(t *testing.T) {
ctx := context.Background()
cpdb := newFileCheckpointsDB(t)
cpdb := newFileCheckpointsDB(t, false)

setInvalidStatus(cpdb)

Expand Down
Loading