Skip to content

Commit

Permalink
br: fix restore metakv without default cf files (#46589) (#46643)
Browse files Browse the repository at this point in the history
close #46578
  • Loading branch information
ti-chi-bot authored Sep 4, 2023
1 parent 6f1bdf7 commit 3843af5
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 26 deletions.
28 changes: 18 additions & 10 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2864,6 +2864,11 @@ func (rc *Client) RestoreMetaKVFiles(
failpoint.Return(errors.New("failpoint: failed before id maps saved"))
})

log.Info("start to restore meta files",
zap.Int("total files", len(files)),
zap.Int("default files", len(filesInDefaultCF)),
zap.Int("write files", len(filesInWriteCF)))

if schemasReplace.NeedConstructIdMap() {
// Preconstruct the map and save it into external storage.
if err := rc.PreConstructAndSaveIDMap(
Expand Down Expand Up @@ -2980,6 +2985,7 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
if i == 0 {
rangeMax = f.MaxTs
rangeMin = f.MinTs
batchSize = f.Length
} else {
if f.MinTs <= rangeMax && batchSize+f.Length <= MetaKVBatchSize {
rangeMin = mathutil.Min(rangeMin, f.MinTs)
Expand Down Expand Up @@ -3012,16 +3018,18 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
writeIdx = toWriteIdx
}
}
if i == len(defaultFiles)-1 {
_, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF)
if err != nil {
return errors.Trace(err)
}
_, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF)
if err != nil {
return errors.Trace(err)
}
}
}

// restore the left meta kv files and entries
// Notice: restoreBatch needs to realize the parameter `files` and `kvEntries` might be empty
// Assert: defaultIdx <= len(defaultFiles) && writeIdx <= len(writeFiles)
_, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF)
if err != nil {
return errors.Trace(err)
}
_, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF)
if err != nil {
return errors.Trace(err)
}

return nil
Expand Down
229 changes: 213 additions & 16 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,78 +648,275 @@ func MockEmptySchemasReplace() *stream.SchemasReplace {
)
}

func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) {
func TestRestoreBatchMetaKVFiles(t *testing.T) {
client := restore.MockClient(nil)
files := []*backuppb.DataFileInfo{}
// test empty files and entries
next, err := client.RestoreBatchMetaKVFiles(context.Background(), files[0:], nil, make([]*restore.KvEntryWithTS, 0), math.MaxUint64, nil, nil, "")
require.NoError(t, err)
require.Equal(t, 0, len(next))
}

func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) {
files_default := []*backuppb.DataFileInfo{}
files_write := []*backuppb.DataFileInfo{}
batchCount := 0

client := restore.MockClient(nil)
sr := MockEmptySchemasReplace()
err := client.RestoreMetaKVFilesWithBatchMethod(
context.Background(),
files,
files,
files_default,
files_write,
sr,
nil,
nil,
func(
ctx context.Context,
defaultFiles []*backuppb.DataFileInfo,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
entries []*restore.KvEntryWithTS,
filterTS uint64,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
cf string,
) ([]*restore.KvEntryWithTS, error) {
require.Equal(t, 0, len(entries))
require.Equal(t, 0, len(files))
batchCount++
return nil, nil
},
)
require.Nil(t, err)
require.Equal(t, batchCount, 0)
require.Equal(t, batchCount, 2)
}

func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) {
files := []*backuppb.DataFileInfo{
func TestRestoreMetaKVFilesWithBatchMethod2_default_empty(t *testing.T) {
files_default := []*backuppb.DataFileInfo{}
files_write := []*backuppb.DataFileInfo{
{
Path: "f1",
MinTs: 100,
MaxTs: 120,
},
}
batchCount := 0
result := make(map[int][]*backuppb.DataFileInfo)

client := restore.MockClient(nil)
sr := MockEmptySchemasReplace()
err := client.RestoreMetaKVFilesWithBatchMethod(
context.Background(),
files,
files_default,
files_write,
sr,
nil,
nil,
func(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
entries []*restore.KvEntryWithTS,
filterTS uint64,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
cf string,
) ([]*restore.KvEntryWithTS, error) {
if len(entries) == 0 && len(files) == 0 {
require.Equal(t, stream.DefaultCF, cf)
batchCount++
} else {
require.Equal(t, 0, len(entries))
require.Equal(t, 1, len(files))
require.Equal(t, uint64(100), files[0].MinTs)
require.Equal(t, stream.WriteCF, cf)
}
require.Equal(t, uint64(math.MaxUint64), filterTS)
return nil, nil
},
)
require.Nil(t, err)
require.Equal(t, batchCount, 1)
}

func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_1(t *testing.T) {
files_default := []*backuppb.DataFileInfo{
{
Path: "f1",
MinTs: 100,
MaxTs: 120,
},
}
files_write := []*backuppb.DataFileInfo{}
batchCount := 0

client := restore.MockClient(nil)
sr := MockEmptySchemasReplace()
err := client.RestoreMetaKVFilesWithBatchMethod(
context.Background(),
files_default,
files_write,
sr,
nil,
nil,
func(
ctx context.Context,
fs []*backuppb.DataFileInfo,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
entries []*restore.KvEntryWithTS,
filterTS uint64,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
cf string,
) ([]*restore.KvEntryWithTS, error) {
if len(fs) > 0 {
result[batchCount] = fs
if len(entries) == 0 && len(files) == 0 {
require.Equal(t, stream.WriteCF, cf)
batchCount++
} else {
require.Equal(t, 0, len(entries))
require.Equal(t, 1, len(files))
require.Equal(t, uint64(100), files[0].MinTs)
require.Equal(t, stream.DefaultCF, cf)
}
require.Equal(t, uint64(math.MaxUint64), filterTS)
return nil, nil
},
)
require.Nil(t, err)
require.Equal(t, batchCount, 1)
require.Equal(t, len(result), 1)
require.Equal(t, result[0], files)
}

func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) {
files_default := []*backuppb.DataFileInfo{
{
Path: "f1",
MinTs: 100,
MaxTs: 120,
Length: restore.MetaKVBatchSize - 1000,
},
{
Path: "f2",
MinTs: 110,
MaxTs: 1100,
Length: restore.MetaKVBatchSize,
},
}
files_write := []*backuppb.DataFileInfo{}
emptyCount := 0
batchCount := 0

client := restore.MockClient(nil)
sr := MockEmptySchemasReplace()
err := client.RestoreMetaKVFilesWithBatchMethod(
context.Background(),
files_default,
files_write,
sr,
nil,
nil,
func(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
entries []*restore.KvEntryWithTS,
filterTS uint64,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
cf string,
) ([]*restore.KvEntryWithTS, error) {
if len(entries) == 0 && len(files) == 0 {
// write - write
require.Equal(t, stream.WriteCF, cf)
emptyCount++
if emptyCount == 1 {
require.Equal(t, uint64(110), filterTS)
} else {
require.Equal(t, uint64(math.MaxUint64), filterTS)
}
} else {
// default - default
batchCount++
require.Equal(t, 1, len(files))
require.Equal(t, stream.DefaultCF, cf)
if batchCount == 1 {
require.Equal(t, uint64(100), files[0].MinTs)
require.Equal(t, uint64(110), filterTS)
return nil, nil
}
require.Equal(t, 0, len(entries))
}
return nil, nil
},
)
require.Nil(t, err)
require.Equal(t, batchCount, 2)
require.Equal(t, emptyCount, 2)
}

func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) {
files_default := []*backuppb.DataFileInfo{
{
Path: "f1",
MinTs: 100,
MaxTs: 120,
Length: restore.MetaKVBatchSize - 1000,
},
{
Path: "f2",
MinTs: 110,
MaxTs: 1100,
Length: restore.MetaKVBatchSize,
},
}
files_write := []*backuppb.DataFileInfo{}
emptyCount := 0
batchCount := 0

client := restore.MockClient(nil)
sr := MockEmptySchemasReplace()
err := client.RestoreMetaKVFilesWithBatchMethod(
context.Background(),
files_default,
files_write,
sr,
nil,
nil,
func(
ctx context.Context,
files []*backuppb.DataFileInfo,
schemasReplace *stream.SchemasReplace,
entries []*restore.KvEntryWithTS,
filterTS uint64,
updateStats func(kvCount uint64, size uint64),
progressInc func(),
cf string,
) ([]*restore.KvEntryWithTS, error) {
if len(entries) == 0 && len(files) == 0 {
// write - write
require.Equal(t, stream.WriteCF, cf)
emptyCount++
if emptyCount == 1 {
require.Equal(t, uint64(110), filterTS)
} else {
require.Equal(t, uint64(math.MaxUint64), filterTS)
}
} else {
// default - default
batchCount++
require.Equal(t, 1, len(files))
require.Equal(t, stream.DefaultCF, cf)
if batchCount == 1 {
require.Equal(t, uint64(100), files[0].MinTs)
require.Equal(t, uint64(110), filterTS)
return nil, nil
}
require.Equal(t, 0, len(entries))
}
return nil, nil
},
)
require.Nil(t, err)
require.Equal(t, batchCount, 2)
require.Equal(t, emptyCount, 2)
}

func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) {
Expand Down Expand Up @@ -988,13 +1185,13 @@ func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) {
Path: "f1",
MinTs: 100,
MaxTs: 120,
Length: 1,
Length: 100,
},
{
Path: "f2",
MinTs: 100,
MaxTs: 120,
Length: restore.MetaKVBatchSize,
Length: restore.MetaKVBatchSize - 100,
},
{
Path: "f3",
Expand Down

0 comments on commit 3843af5

Please sign in to comment.