Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Oct 30, 2023
2 parents 5e5c28f + f9f6bb3 commit f6fe674
Show file tree
Hide file tree
Showing 239 changed files with 18,961 additions and 13,429 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ bazel-testlogs
bazel-tidb
.ijwb/
/oom_record/
*.log.json
2,132 changes: 1,014 additions & 1,118 deletions DEPS.bzl

Large diffs are not rendered by default.

148 changes: 147 additions & 1 deletion br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,152 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) {
log.Info("delete snapshot end", zap.Int("need-to-del", len(snapIDMap)), zap.Int32("deleted", deletedCnt.Load()))
}

// EnableDataFSR enables FSR for data volume snapshots
func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) (map[string][]*string, error) {
snapshotsIDsMap := fetchTargetSnapshots(meta, targetAZ)

if len(snapshotsIDsMap) == 0 {
return snapshotsIDsMap, errors.Errorf("empty backup meta")
}

eg, _ := errgroup.WithContext(context.Background())

for availableZone := range snapshotsIDsMap {
targetAZ := availableZone
eg.Go(func() error {
log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ))
resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{
AvailabilityZones: []*string{&targetAZ},
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
})

if err != nil {
return errors.Trace(err)
}

if len(resp.Unsuccessful) > 0 {
log.Warn("not all snapshots enabled FSR")
return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
}

return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ], targetAZ)
})
}
return snapshotsIDsMap, eg.Wait()
}

// waitDataFSREnabled waits FSR for data volume snapshots are all enabled
func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error {
// Create a map to store the strings as keys
pendingSnapshots := make(map[string]struct{})

// Populate the map with the strings from the array
for _, str := range snapShotIDs {
pendingSnapshots[*str] = struct{}{}
}

log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ))
for {
if len(pendingSnapshots) == 0 {
log.Info("all snapshots fsr enablement is finished", zap.String("available zone", targetAZ))
return nil
}

// check pending snapshots every 1 minute
time.Sleep(1 * time.Minute)
log.Info("check snapshots not fsr enabled", zap.Int("count", len(pendingSnapshots)))
input := &ec2.DescribeFastSnapshotRestoresInput{
Filters: []*ec2.Filter{
{
Name: aws.String("state"),
Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")},
},
{
Name: aws.String("availability-zone"),
Values: []*string{aws.String(targetAZ)},
},
},
}

result, err := e.ec2.DescribeFastSnapshotRestores(input)
if err != nil {
return errors.Trace(err)
}

uncompletedSnapshots := make(map[string]struct{})
for _, fastRestore := range result.FastSnapshotRestores {
_, found := pendingSnapshots[*fastRestore.SnapshotId]
if found {
// Detect some conflict states
if strings.EqualFold(*fastRestore.State, "disabled") || strings.EqualFold(*fastRestore.State, "disabling") {
log.Error("detect conflict status", zap.String("snapshot", *fastRestore.SnapshotId), zap.String("status", *fastRestore.State))
return errors.Errorf("status of snapshot %s is %s ", *fastRestore.SnapshotId, *fastRestore.State)
}
uncompletedSnapshots[*fastRestore.SnapshotId] = struct{}{}
}
}
pendingSnapshots = uncompletedSnapshots
}
}

// DisableDataFSR disables FSR for data volume snapshots
func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error {
if len(snapshotsIDsMap) == 0 {
return nil
}

eg, _ := errgroup.WithContext(context.Background())

for availableZone := range snapshotsIDsMap {
targetAZ := availableZone
eg.Go(func() error {
resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{
AvailabilityZones: []*string{&targetAZ},
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
})

if err != nil {
return errors.Trace(err)
}

if len(resp.Unsuccessful) > 0 {
log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ))
return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
}

log.Info("Disable FSR issued", zap.String("available zone", targetAZ))

return nil
})
}
return eg.Wait()
}

func fetchTargetSnapshots(meta *config.EBSBasedBRMeta, specifiedAZ string) map[string][]*string {
var sourceSnapshotIDs = make(map[string][]*string)

if len(meta.TiKVComponent.Stores) == 0 {
return sourceSnapshotIDs
}

for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
for j := range store.Volumes {
oldVol := store.Volumes[j]
// Handle data volume snapshots only
if strings.Compare(oldVol.Type, "storage.data-dir") == 0 {
if specifiedAZ != "" {
sourceSnapshotIDs[specifiedAZ] = append(sourceSnapshotIDs[specifiedAZ], &oldVol.SnapshotID)
} else {
sourceSnapshotIDs[oldVol.VolumeAZ] = append(sourceSnapshotIDs[oldVol.VolumeAZ], &oldVol.SnapshotID)
}
}
}
}

return sourceSnapshotIDs
}

// CreateVolumes create volumes from snapshots
// if err happens in the middle, return half-done result
// returned map: store id -> old volume id -> new volume id
Expand Down Expand Up @@ -377,7 +523,7 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
for len(pendingVolumes) > 0 {
// check every 5 seconds
time.Sleep(5 * time.Second)
log.Info("check pending snapshots", zap.Int("count", len(pendingVolumes)))
log.Info("check pending volumes", zap.Int("count", len(pendingVolumes)))
resp, err := e.ec2.DescribeVolumes(&ec2.DescribeVolumesInput{
VolumeIds: pendingVolumes,
})
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/config/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ func (c *EBSBasedBRMeta) GetStoreCount() uint64 {
return uint64(len(c.TiKVComponent.Stores))
}

func (c *EBSBasedBRMeta) GetTiKVVolumeCount() uint64 {
if c.TiKVComponent == nil || len(c.TiKVComponent.Stores) == 0 {
return 0
}
// Assume TiKV nodes are symmetric
return uint64(len(c.TiKVComponent.Stores[0].Volumes))
}

func (c *EBSBasedBRMeta) String() string {
cfg, err := json.Marshal(c)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ type ExternalEngineConfig struct {
StorageURI string
DataFiles []string
StatFiles []string
MinKey []byte
MaxKey []byte
StartKey []byte
EndKey []byte
SplitKeys [][]byte
RegionSplitSize int64
// TotalFileSize can be an estimated value.
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
minKey []byte
maxKey []byte
startKey []byte
endKey []byte
splitKeys [][]byte
regionSplitSize int64
bufPool *membuf.Pool
Expand All @@ -66,8 +66,8 @@ func NewExternalEngine(
storage storage.ExternalStorage,
dataFiles []string,
statsFiles []string,
minKey []byte,
maxKey []byte,
startKey []byte,
endKey []byte,
splitKeys [][]byte,
regionSplitSize int64,
keyAdapter common.KeyAdapter,
Expand All @@ -82,8 +82,8 @@ func NewExternalEngine(
storage: storage,
dataFiles: dataFiles,
statsFiles: statsFiles,
minKey: minKey,
maxKey: maxKey,
startKey: startKey,
endKey: endKey,
splitKeys: splitKeys,
regionSplitSize: regionSplitSize,
bufPool: membuf.NewPool(),
Expand Down Expand Up @@ -305,8 +305,8 @@ func (e *Engine) ID() string {
}

// GetKeyRange implements common.Engine.
func (e *Engine) GetKeyRange() (firstKey []byte, lastKey []byte, err error) {
return e.minKey, e.maxKey, nil
func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
return e.startKey, e.endKey, nil
}

// SplitRanges split the ranges by split keys provided by external engine.
Expand Down
82 changes: 45 additions & 37 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,41 +206,39 @@ func GetMaxOverlapping(points []Endpoint) int64 {

// SortedKVMeta is the meta of sorted kv.
type SortedKVMeta struct {
MinKey []byte `json:"min-key"`
MaxKey []byte `json:"max-key"`
TotalKVSize uint64 `json:"total-kv-size"`
// seems those 2 fields always generated from MultipleFilesStats,
// maybe remove them later.
DataFiles []string `json:"data-files"`
StatFiles []string `json:"stat-files"`
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"` // exclusive
TotalKVSize uint64 `json:"total-kv-size"`
MultipleFilesStats []MultipleFilesStat `json:"multiple-files-stats"`
}

// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary.
// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary. If the summary
// is empty, it will return a pointer to zero SortedKVMeta.
func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta {
meta := &SortedKVMeta{
MinKey: summary.Min.Clone(),
MaxKey: summary.Max.Clone(),
if summary == nil || (len(summary.Min) == 0 && len(summary.Max) == 0) {
return &SortedKVMeta{}
}
return &SortedKVMeta{
StartKey: summary.Min.Clone(),
EndKey: summary.Max.Clone().Next(),
TotalKVSize: summary.TotalSize,
MultipleFilesStats: summary.MultipleFilesStats,
}
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
meta.DataFiles = append(meta.DataFiles, filename[0])
meta.StatFiles = append(meta.StatFiles, filename[1])
}
}
return meta
}

// Merge merges the other SortedKVMeta into this one.
func (m *SortedKVMeta) Merge(other *SortedKVMeta) {
m.MinKey = NotNilMin(m.MinKey, other.MinKey)
m.MaxKey = NotNilMax(m.MaxKey, other.MaxKey)
m.TotalKVSize += other.TotalKVSize
if len(other.StartKey) == 0 && len(other.EndKey) == 0 {
return
}
if len(m.StartKey) == 0 && len(m.EndKey) == 0 {
*m = *other
return
}

m.DataFiles = append(m.DataFiles, other.DataFiles...)
m.StatFiles = append(m.StatFiles, other.StatFiles...)
m.StartKey = BytesMin(m.StartKey, other.StartKey)
m.EndKey = BytesMax(m.EndKey, other.EndKey)
m.TotalKVSize += other.TotalKVSize

m.MultipleFilesStats = append(m.MultipleFilesStats, other.MultipleFilesStats...)
}
Expand All @@ -250,28 +248,38 @@ func (m *SortedKVMeta) MergeSummary(summary *WriterSummary) {
m.Merge(NewSortedKVMeta(summary))
}

// NotNilMin returns the smallest of a and b, ignoring nil values.
func NotNilMin(a, b []byte) []byte {
if len(a) == 0 {
return b
// GetDataFiles returns all data files in the meta.
func (m *SortedKVMeta) GetDataFiles() []string {
var ret []string
for _, stat := range m.MultipleFilesStats {
for _, files := range stat.Filenames {
ret = append(ret, files[0])
}
}
if len(b) == 0 {
return a
return ret
}

// GetStatFiles returns all stat files in the meta.
func (m *SortedKVMeta) GetStatFiles() []string {
var ret []string
for _, stat := range m.MultipleFilesStats {
for _, files := range stat.Filenames {
ret = append(ret, files[1])
}
}
return ret
}

// BytesMin returns the smallest of byte slice a and b.
func BytesMin(a, b []byte) []byte {
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// NotNilMax returns the largest of a and b, ignoring nil values.
func NotNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
// BytesMax returns the largest of byte slice a and b.
func BytesMax(a, b []byte) []byte {
if bytes.Compare(a, b) > 0 {
return a
}
Expand Down
Loading

0 comments on commit f6fe674

Please sign in to comment.