From 01ae5a2a3693ba8a54a4d094f5b06de6f95b324a Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Mon, 4 Sep 2023 20:33:45 +0800 Subject: [PATCH] This is an automated cherry-pick of #46589 Signed-off-by: ti-chi-bot --- br/pkg/restore/client.go | 51 +++++-- br/pkg/restore/client_test.go | 252 ++++++++++++++++++++++++++++++++-- 2 files changed, 281 insertions(+), 22 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 422c4cce3f63e..d0111bea27d3a 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2323,6 +2323,34 @@ func (rc *Client) RestoreMetaKVFiles( } } +<<<<<<< HEAD +======= + failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) { + failpoint.Return(errors.New("failpoint: failed before id maps saved")) + }) + + log.Info("start to restore meta files", + zap.Int("total files", len(files)), + zap.Int("default files", len(filesInDefaultCF)), + zap.Int("write files", len(filesInWriteCF))) + + if schemasReplace.NeedConstructIdMap() { + // Preconstruct the map and save it into external storage. + if err := rc.PreConstructAndSaveIDMap( + ctx, + filesInWriteCF, + filesInDefaultCF, + schemasReplace, + ); err != nil { + return errors.Trace(err) + } + } + failpoint.Inject("failed-after-id-maps-saved", func(_ failpoint.Value) { + failpoint.Return(errors.New("failpoint: failed after id maps saved")) + }) + + // run the rewrite and restore meta-kv into TiKV cluster. +>>>>>>> 5319cf7d8a8 (br: fix restore metakv without default cf files (#46589)) if err := rc.RestoreMetaKVFilesWithBatchMethod( ctx, SortMetaKVFiles(filesInDefaultCF), @@ -2380,6 +2408,7 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod( if i == 0 { rangeMax = f.MaxTs rangeMin = f.MinTs + batchSize = f.Length } else { if f.MinTs <= rangeMax && batchSize+f.Length <= MetaKVBatchSize { rangeMin = mathutil.Min(rangeMin, f.MinTs) @@ -2412,16 +2441,18 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod( writeIdx = toWriteIdx } } - if i == len(defaultFiles)-1 { - _, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF) - if err != nil { - return errors.Trace(err) - } - _, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF) - if err != nil { - return errors.Trace(err) - } - } + } + + // restore the left meta kv files and entries + // Notice: restoreBatch needs to realize the parameter `files` and `kvEntries` might be empty + // Assert: defaultIdx <= len(defaultFiles) && writeIdx <= len(writeFiles) + _, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF) + if err != nil { + return errors.Trace(err) + } + _, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF) + if err != nil { + return errors.Trace(err) } return nil diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index f86abe231b13a..0ca4506e9d345 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -632,21 +632,56 @@ func TestDeleteRangeQuery(t *testing.T) { require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)") } +<<<<<<< HEAD func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { +======= +func MockEmptySchemasReplace() *stream.SchemasReplace { + dbMap := make(map[stream.UpstreamID]*stream.DBReplace) + return stream.NewSchemasReplace( + dbMap, + true, + nil, + 9527, + filter.All(), + nil, + nil, + nil, + nil, + ) +} + +func TestRestoreBatchMetaKVFiles(t *testing.T) { + client := restore.MockClient(nil) +>>>>>>> 5319cf7d8a8 (br: fix restore metakv without default cf files (#46589)) files := []*backuppb.DataFileInfo{} + // test empty files and entries + next, err := client.RestoreBatchMetaKVFiles(context.Background(), files[0:], nil, make([]*restore.KvEntryWithTS, 0), math.MaxUint64, nil, nil, "") + require.NoError(t, err) + require.Equal(t, 0, len(next)) +} + +func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { + files_default := []*backuppb.DataFileInfo{} + files_write := []*backuppb.DataFileInfo{} batchCount := 0 client := restore.MockClient(nil) err := client.RestoreMetaKVFilesWithBatchMethod( context.Background(), +<<<<<<< HEAD files, files, nil, +======= + files_default, + files_write, + sr, +>>>>>>> 5319cf7d8a8 (br: fix restore metakv without default cf files (#46589)) nil, nil, func( ctx context.Context, - defaultFiles []*backuppb.DataFileInfo, + files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, entries []*restore.KvEntryWithTS, filterTS uint64, @@ -654,16 +689,19 @@ func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { progressInc func(), cf string, ) ([]*restore.KvEntryWithTS, error) { + require.Equal(t, 0, len(entries)) + require.Equal(t, 0, len(files)) batchCount++ return nil, nil }, ) require.Nil(t, err) - require.Equal(t, batchCount, 0) + require.Equal(t, batchCount, 2) } -func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { - files := []*backuppb.DataFileInfo{ +func TestRestoreMetaKVFilesWithBatchMethod2_default_empty(t *testing.T) { + files_default := []*backuppb.DataFileInfo{} + files_write := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, @@ -671,19 +709,71 @@ func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { }, } batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) client := restore.MockClient(nil) err := client.RestoreMetaKVFilesWithBatchMethod( context.Background(), +<<<<<<< HEAD files, nil, nil, +======= + files_default, + files_write, + sr, +>>>>>>> 5319cf7d8a8 (br: fix restore metakv without default cf files (#46589)) nil, nil, func( ctx context.Context, - fs []*backuppb.DataFileInfo, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + require.Equal(t, stream.DefaultCF, cf) + batchCount++ + } else { + require.Equal(t, 0, len(entries)) + require.Equal(t, 1, len(files)) + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, stream.WriteCF, cf) + } + require.Equal(t, uint64(math.MaxUint64), filterTS) + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 1) +} + +func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_1(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + } + files_write := []*backuppb.DataFileInfo{} + batchCount := 0 + + client := restore.MockClient(nil) + sr := MockEmptySchemasReplace() + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + sr, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, entries []*restore.KvEntryWithTS, filterTS uint64, @@ -691,17 +781,155 @@ func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { progressInc func(), cf string, ) ([]*restore.KvEntryWithTS, error) { - if len(fs) > 0 { - result[batchCount] = fs + if len(entries) == 0 && len(files) == 0 { + require.Equal(t, stream.WriteCF, cf) batchCount++ + } else { + require.Equal(t, 0, len(entries)) + require.Equal(t, 1, len(files)) + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, stream.DefaultCF, cf) } + require.Equal(t, uint64(math.MaxUint64), filterTS) return nil, nil }, ) require.Nil(t, err) require.Equal(t, batchCount, 1) - require.Equal(t, len(result), 1) - require.Equal(t, result[0], files) +} + +func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize - 1000, + }, + { + Path: "f2", + MinTs: 110, + MaxTs: 1100, + Length: restore.MetaKVBatchSize, + }, + } + files_write := []*backuppb.DataFileInfo{} + emptyCount := 0 + batchCount := 0 + + client := restore.MockClient(nil) + sr := MockEmptySchemasReplace() + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + sr, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + // write - write + require.Equal(t, stream.WriteCF, cf) + emptyCount++ + if emptyCount == 1 { + require.Equal(t, uint64(110), filterTS) + } else { + require.Equal(t, uint64(math.MaxUint64), filterTS) + } + } else { + // default - default + batchCount++ + require.Equal(t, 1, len(files)) + require.Equal(t, stream.DefaultCF, cf) + if batchCount == 1 { + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, uint64(110), filterTS) + return nil, nil + } + require.Equal(t, 0, len(entries)) + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 2) + require.Equal(t, emptyCount, 2) +} + +func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize - 1000, + }, + { + Path: "f2", + MinTs: 110, + MaxTs: 1100, + Length: restore.MetaKVBatchSize, + }, + } + files_write := []*backuppb.DataFileInfo{} + emptyCount := 0 + batchCount := 0 + + client := restore.MockClient(nil) + sr := MockEmptySchemasReplace() + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + sr, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + // write - write + require.Equal(t, stream.WriteCF, cf) + emptyCount++ + if emptyCount == 1 { + require.Equal(t, uint64(110), filterTS) + } else { + require.Equal(t, uint64(math.MaxUint64), filterTS) + } + } else { + // default - default + batchCount++ + require.Equal(t, 1, len(files)) + require.Equal(t, stream.DefaultCF, cf) + if batchCount == 1 { + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, uint64(110), filterTS) + return nil, nil + } + require.Equal(t, 0, len(entries)) + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 2) + require.Equal(t, emptyCount, 2) } func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { @@ -967,13 +1195,13 @@ func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { Path: "f1", MinTs: 100, MaxTs: 120, - Length: 1, + Length: 100, }, { Path: "f2", MinTs: 100, MaxTs: 120, - Length: restore.MetaKVBatchSize, + Length: restore.MetaKVBatchSize - 100, }, { Path: "f3",