Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
move waiting reject stores in import file (#222)
Browse files Browse the repository at this point in the history
* move wait rejectstores into import files

* restore: use new table id to search placementRules

* Update pkg/restore/import.go

Co-Authored-By: Neil Shen <overvenus@gmail.com>

* Update pkg/restore/import.go

Co-Authored-By: kennytm <kennytm@gmail.com>

* fix ci

Co-authored-by: Neil Shen <overvenus@gmail.com>
Co-authored-by: kennytm <kennytm@gmail.com>
  • Loading branch information
3 people authored Apr 1, 2020
1 parent f7dc2db commit 01de3f5
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 90 deletions.
16 changes: 10 additions & 6 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,18 @@ func (rc *Client) CreateTables(

// RemoveTiFlashReplica removes all the tiflash replicas of a table
// TODO: remove this after tiflash supports restore
func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, placementRules []placement.Rule) error {
func (rc *Client) RemoveTiFlashReplica(
tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error {
schemas := make([]*backup.Schema, 0, len(tables))
var updateReplica bool
for _, table := range tables {
if rule := utils.SearchPlacementRule(table.Info.ID, placementRules, placement.Learner); rule != nil {
// must use new table id to search placement rules
// here newTables and tables must have same order
for i, table := range tables {
if rule := utils.SearchPlacementRule(newTables[i].ID, placementRules, placement.Learner); rule != nil {
table.TiFlashReplicas = rule.Count
updateReplica = true
}
tableData, err := json.Marshal(table.Info)
tableData, err := json.Marshal(newTables[i])
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -454,6 +457,7 @@ func (rc *Client) setSpeedLimit() error {
func (rc *Client) RestoreFiles(
files []*backup.File,
rewriteRules *RewriteRules,
rejectStoreMap map[uint64]bool,
updateCh glue.Progress,
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -486,7 +490,7 @@ func (rc *Client) RestoreFiles(
select {
case <-rc.ctx.Done():
errCh <- rc.ctx.Err()
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
case errCh <- rc.fileImporter.Import(fileReplica, rejectStoreMap, rewriteRules):
updateCh.Inc()
}
})
Expand Down Expand Up @@ -537,7 +541,7 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil
select {
case <-rc.ctx.Done():
errCh <- rc.ctx.Err()
case errCh <- rc.fileImporter.Import(fileReplica, emptyRules):
case errCh <- rc.fileImporter.Import(fileReplica, nil, emptyRules):
updateCh.Inc()
}
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
}
rules, newTables, err := client.CreateTables(s.mock.Domain, tables, 0)
c.Assert(err, IsNil)
// make sure tables and newTables have same order
for i, t := range tables {
c.Assert(newTables[i].Name, Equals, t.Info.Name)
}
for _, nt := range newTables {
c.Assert(nt.Name.String(), Matches, "test[0-3]")
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error {

// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error {
func (importer *FileImporter) Import(
file *backup.File,
rejectStoreMap map[uint64]bool,
rewriteRules *RewriteRules,
) error {
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
var startKey, endKey []byte
Expand All @@ -193,6 +197,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
zap.Stringer("file", file),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey))

needReject := len(rejectStoreMap) > 0

err = utils.WithRetry(importer.ctx, func() error {
ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime)
defer cancel()
Expand All @@ -202,6 +209,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}

if needReject {
// TODO remove when TiFlash support restore
startTime := time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap))
for _, region := range regionInfos {
if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime)))
needReject = false
}

log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
Expand Down
70 changes: 0 additions & 70 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
rejectStores map[uint64]bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -95,14 +94,12 @@ func (rs *RegionSplitter) Split(
}
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
allRegions := make([]*RegionInfo, 0)
SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, errScan := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if errScan != nil {
return errors.Trace(errScan)
}
allRegions = append(allRegions, regions...)
if len(regions) == 0 {
log.Warn("cannot scan any region")
return nil
Expand Down Expand Up @@ -145,19 +142,6 @@ SplitRegions:
if errSplit != nil {
return errors.Trace(errSplit)
}
if len(rejectStores) > 0 {
startTime = time.Now()
log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStores))
for _, region := range allRegions {
if !rs.waitForRemoveRejectStores(ctx, region, rejectStores) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(allRegions)), zap.Duration("take", time.Since(startTime)))
}
log.Info("start to wait for scattering regions",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
startTime = time.Now()
Expand Down Expand Up @@ -211,30 +195,6 @@ func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID
return ok, nil
}

func (rs *RegionSplitter) hasRejectStorePeer(
ctx context.Context,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := rs.client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
return false, nil
}

func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) {
interval := SplitCheckInterval
for i := 0; i < SplitCheckMaxRetryTimes; i++ {
Expand Down Expand Up @@ -280,36 +240,6 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
}
}

func (rs *RegionSplitter) waitForRemoveRejectStores(
ctx context.Context,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := rs.hasRejectStorePeer(ctx1, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}

func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) {
regionSplitter := NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, map[uint64]bool{}, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
Expand Down
67 changes: 57 additions & 10 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -332,16 +331,8 @@ func SplitRanges(
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
storeMap := make(map[uint64]bool)
for _, store := range tiflashStores {
storeMap[store.GetId()] = true
}

return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) {
return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
for range keys {
updateCh.Inc()
}
Expand Down Expand Up @@ -416,3 +407,59 @@ func paginateScanRegion(
}
return regions, nil
}

func hasRejectStorePeer(
ctx context.Context,
client SplitClient,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
return false, nil
}

func waitForRemoveRejectStores(
ctx context.Context,
client SplitClient,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}
15 changes: 13 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return err
}
err = client.RemoveTiFlashReplica(tables, placementRules)

err = client.RemoveTiFlashReplica(tables, newTables, placementRules)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,6 +223,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if batchSize > maxRestoreBatchSizeLimit {
batchSize = maxRestoreBatchSizeLimit // 256
}

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
rejectStoreMap := make(map[uint64]bool)
for _, store := range tiflashStores {
rejectStoreMap[store.GetId()] = true
}

for {
if len(ranges) == 0 {
break
Expand All @@ -246,7 +257,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

// After split, we can restore backup files.
err = client.RestoreFiles(fileBatch, rewriteRules, updateCh)
err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh)
if err != nil {
break
}
Expand Down

0 comments on commit 01de3f5

Please sign in to comment.