Skip to content

Commit

Permalink
br: make download metadata concurrency adjustable (#45639)
Browse files Browse the repository at this point in the history
close #45511
  • Loading branch information
Leavrth authored Aug 7, 2023
1 parent 58ac0e6 commit 6ad49e7
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 123 deletions.
4 changes: 3 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,11 +2256,13 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error {
func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error {
init := LogFileManagerInit{
StartTS: startTS,
RestoreTS: restoreTS,
Storage: rc.storage,

MetadataDownloadBatchSize: metadataDownloadBatchSize,
}
var err error
rc.logFileManager, err = CreateLogFileManager(ctx, init)
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
"go.uber.org/zap"
)

const (
readMetaConcurrency = 128
readMetaBatchSize = 512
)

// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

Expand Down Expand Up @@ -64,13 +59,17 @@ type logFileManager struct {

storage storage.ExternalStorage
helper *stream.MetadataHelper

metadataDownloadBatchSize uint
}

// LogFileManagerInit is the config needed for initializing the log file manager.
type LogFileManagerInit struct {
StartTS uint64
RestoreTS uint64
Storage storage.ExternalStorage

MetadataDownloadBatchSize uint
}

type DDLMetaGroup struct {
Expand All @@ -86,6 +85,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(),

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
err := fm.loadShiftTS(ctx)
if err != nil {
Expand All @@ -104,7 +105,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error {
value uint64
exists bool
}{}
err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
Expand Down Expand Up @@ -173,8 +174,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
}
return meta, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
reader := iter.Transform(namesIter, readMeta,
iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency))
iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize))
return reader, nil
}

Expand Down
5 changes: 5 additions & 0 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,

MetadataDownloadBatchSize: 32,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
Expand Down Expand Up @@ -301,6 +303,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.MetadataDownloadBatchSize = 128
meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
Expand Down Expand Up @@ -460,6 +463,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) {
StartTS: start,
RestoreTS: end,
Storage: loc,

MetadataDownloadBatchSize: 32,
})
req.NoError(err)

Expand Down
7 changes: 4 additions & 3 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type StreamMetadataSet struct {

// keeps the meta-information of metadata as little as possible
// to save the memory
metadataInfos map[string]*MetadataInfo
metadataInfos map[string]*MetadataInfo
MetadataDownloadBatchSize uint

// a parser of metadata
Helper *stream.MetadataHelper
Expand Down Expand Up @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s
metadataMap.metas = make(map[string]*MetadataInfo)
// `shiftUntilTS` must be less than `until`
metadataMap.shiftUntilTS = until
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := ms.Helper.ParseToMetadataHard(raw)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context
item []string
sync.Mutex
}
worker := utils.NewWorkerPool(128, "delete files")
worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files")
eg, cx := errgroup.WithContext(ctx)
for path, metaInfo := range ms.metadataInfos {
path := path
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) {
require.NoError(t, fakeStreamBackup(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) {
require.NoError(t, fakeStreamBackupV2(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
streamBackupMetaPrefix = "v1/backupmeta"

streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint"

metaDataWorkerPoolSize = 128
)

func GetStreamBackupMetaPrefix() string {
Expand Down Expand Up @@ -311,9 +309,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) {
func FastUnmarshalMetaData(
ctx context.Context,
s storage.ExternalStorage,
metaDataWorkerPoolSize uint,
fn func(path string, rawMetaData []byte) error,
) error {
log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize))
log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize))
pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata")
eg, ectx := errgroup.WithContext(ctx)
opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ go_test(
],
embed = [":task"],
flaky = True,
shard_count = 20,
shard_count = 18,
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
flagCipherKey = "crypter.key"
flagCipherKeyFile = "crypter.key-file"

flagMetadataDownloadBatchSize = "metadata-download-batch-size"
defaultMetadataDownloadBatchSize = 128

unlimited = 0
crypterAES128KeyLen = 16
crypterAES192KeyLen = 24
Expand Down Expand Up @@ -245,6 +248,9 @@ type Config struct {

// KeyspaceName is the name of the keyspace of the task
KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"`

// Metadata download batch size, such as metadata for log restore
MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"`
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -294,6 +300,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
"by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"")
flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key")

flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize,
"the batch size of downloading metadata, such as log restore metadata for truncate or restore")

_ = flags.MarkHidden(flagMetadataDownloadBatchSize)

storage.DefineFlags(flags)
}

Expand Down Expand Up @@ -582,6 +593,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil {
return errors.Trace(err)
}

return cfg.normalizePDURLs()
}

Expand Down Expand Up @@ -742,6 +757,9 @@ func (cfg *Config) adjust() {
if cfg.ChecksumConcurrency == 0 {
cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency
}
if cfg.MetadataDownloadBatchSize == 0 {
cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize
}
}

func normalizePDURL(pd string, useTLS bool) (string, error) {
Expand Down
44 changes: 4 additions & 40 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize,
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
if err != nil {
Expand Down Expand Up @@ -1300,7 +1301,7 @@ func restoreStream(
}()
}

err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS)
err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -1628,43 +1629,6 @@ func getFullBackupTS(
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

func getGlobalResolvedTS(
ctx context.Context,
s storage.ExternalStorage,
helper *stream.MetadataHelper,
) (uint64, error) {
storeMap := struct {
sync.Mutex
resolvedTSMap map[int64]uint64
}{}
storeMap.resolvedTSMap = make(map[int64]uint64)
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
m, err := helper.ParseToMetadata(raw)
if err != nil {
return err
}
storeMap.Lock()
if resolveTS, exist := storeMap.resolvedTSMap[m.StoreId]; !exist || resolveTS < m.ResolvedTs {
storeMap.resolvedTSMap[m.StoreId] = m.ResolvedTs
}
storeMap.Unlock()
return nil
})
if err != nil {
return 0, errors.Trace(err)
}
var globalCheckpointTS uint64 = 0
// If V3 global-checkpoint advance, the maximum value in storeMap.resolvedTSMap as global-checkpoint-ts.
// If v2 global-checkpoint advance, it need the minimal value in storeMap.resolvedTSMap as global-checkpoint-ts.
// Because each of store maintains own checkpoint-ts only.
for _, resolveTS := range storeMap.resolvedTSMap {
if globalCheckpointTS < resolveTS {
globalCheckpointTS = resolveTS
}
}
return globalCheckpointTS, nil
}

func parseFullBackupTablesStorage(
ctx context.Context,
cfg *RestoreConfig,
Expand Down
Loading

0 comments on commit 6ad49e7

Please sign in to comment.