Skip to content

Commit

Permalink
Merge pull request pingcap#7 from 3pointer/address_comments
Browse files Browse the repository at this point in the history
Address comments
  • Loading branch information
fengou1 authored Sep 2, 2022
2 parents 96e0cb6 + 64319d7 commit b174bfb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 72 deletions.
2 changes: 1 addition & 1 deletion br/pkg/config/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,4 @@ func (c *EBSBasedBRMeta) SetVolumeAZs(idMap map[string]string) {
volume.VolumeAZ = idMap[volume.ID]
}
}
}
}
106 changes: 52 additions & 54 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
// in future, num of tikv may extend to a large number, this is limitation of connection pool to tikv
// per our knowledge, in present, 128 may a good enough.
const (
max_store_concurency = 128
maxStoreConcurrency = 128
)

// recover the tikv cluster
// RecoverData recover the tikv cluster
// 1. read all meta data from tikvs
// 2. make recovery plan and then recovery max allocate ID firstly
// 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progres
progress: progress}
}

func (recovery *Recovery) newTiKVRecoveryClient(ctx context.Context, tikvAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
// Connect to the Recovery service on the given TiKV node.
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
Expand All @@ -115,7 +115,7 @@ func (recovery *Recovery) newTiKVRecoveryClient(ctx context.Context, tikvAddr st
//keepaliveConf keepalive.ClientParameters
conn, err := grpc.DialContext(
ctx,
tikvAddr,
storeAddr,
opt,
grpc.WithBlock(),
grpc.FailOnNonTempDialError(true),
Expand Down Expand Up @@ -148,11 +148,11 @@ func getStoreAddress(allStores []*metapb.Store, storeId uint64) string {
return addr
}

// read all region meta from tikvs
// ReadRegionMeta read all region meta from tikvs
func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, max_store_concurency)), "Collect Region Meta") // TODO: int overflow?
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, maxStoreConcurrency)), "Collect Region Meta") // TODO: int overflow?

// TODO: optimize the ErroGroup when TiKV is panic
metaChan := make(chan StoreMeta, 1024)
Expand All @@ -167,17 +167,16 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
return errors.Trace(err)
}

tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()

workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("read meta from tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
stream, err := tikvClient.ReadRegionMeta(ectx, &recovpb.ReadRegionMetaRequest{StoreId: storeId})
stream, err := recoveryClient.ReadRegionMeta(ectx, &recovpb.ReadRegionMetaRequest{StoreId: storeId})
if err != nil {
log.Error("read region meta failed", zap.Uint64("storeID", storeId))
log.Error("read region meta failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}

Expand Down Expand Up @@ -217,48 +216,48 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {

func (recovery *Recovery) getTotalRegions() int {
// Group region peer info by region id.
var regions = make(map[uint64][]struct{}, 0)
var regions = make(map[uint64]struct{}, 0)
for _, v := range recovery.storeMetas {
for _, m := range v.regionMetas {
if regions[m.RegionId] == nil {
regions[m.RegionId] = make([]struct{}, 0, len(recovery.allStores))
if _, ok := regions[m.RegionId]; !ok {
regions[m.RegionId] = struct{}{}
}
}
}
return len(regions)
}

// send the recovery plan to recovery region (force leader etc)
// RecoverRegions send the recovery plan to recovery region (force leader etc)
// only tikvs have regions whose have to recover be sent
func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
totalRecoveredStores := len(recovery.recoveryPlan)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalRecoveredStores, max_store_concurency)), "Recover Regions")
workers := utils.NewWorkerPool(uint(mathutil.Min(totalRecoveredStores, maxStoreConcurrency)), "Recover Regions")

for storeId, plan := range recovery.recoveryPlan {
if err := ectx.Err(); err != nil {
return errors.Trace(err)
}

storeAddr := getStoreAddress(recovery.allStores, storeId)
tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("storeID", storeId))
return errors.Trace(err)
}
defer conn.Close()
cmd := plan
storeId := storeId
recoveryPlan := plan
recoveryStoreId := storeId
workers.ApplyOnErrorGroup(eg, func() error {
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
stream, err := tikvClient.RecoverRegion(ectx)
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}
defer conn.Close()
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId))
stream, err := recoveryClient.RecoverRegion(ectx)
if err != nil {
log.Error("create recover region failed", zap.Uint64("storeID", storeId))
log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId))
return errors.Trace(err)
}

// for a TiKV, send the stream
for _, s := range cmd {
for _, s := range recoveryPlan {
if err = stream.Send(s); err != nil {
log.Error("send region recovery region failed", zap.Error(err))
return errors.Trace(err)
Expand All @@ -271,75 +270,74 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
return errors.Trace(err)
}
recovery.progress.Inc()
log.Info("recovery region execution success", zap.Uint64("storeID", reply.GetStoreId()))
log.Info("recovery region execution success", zap.Uint64("store id", reply.GetStoreId()))
return nil
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

// send wait apply to all tikv ensure all region peer apply log into the last
// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, max_store_concurency)), "wait apply")
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, maxStoreConcurrency)), "wait apply")

for _, store := range recovery.allStores {
if err := ectx.Err(); err != nil {
return errors.Trace(err)
}
storeAddr := getStoreAddress(recovery.allStores, store.Id)
tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
storeId := store.Id

workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("send wait apply to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
req := &recovpb.WaitApplyRequest{StoreId: storeId}
_, err := tikvClient.WaitApply(ectx, req)
_, err = recoveryClient.WaitApply(ectx, req)
if err != nil {
log.Error("wait apply failed", zap.Uint64("storeID", storeId))
log.Error("wait apply failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}

recovery.progress.Inc()
log.Info("recovery wait apply execution success", zap.Uint64("storeID", storeId))
log.Info("recovery wait apply execution success", zap.Uint64("store id", storeId))
return nil
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

// a worker pool to all tikv for execute delete all data whose has ts > resolvedTs
// ResolveData a worker pool to all tikv for execute delete all data whose has ts > resolvedTs
func (recovery *Recovery) ResolveData(ctx context.Context, resolvedTs uint64) (err error) {

eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, max_store_concurency)), "resolve data from tikv")
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, maxStoreConcurrency)), "resolve data from tikv")

// TODO: what if the resolved data take long time take long time?, it look we need some handling here, at least some retry may neccessary
for _, store := range recovery.allStores {
if err := ectx.Err(); err != nil {
return errors.Trace(err)
}
storeAddr := getStoreAddress(recovery.allStores, store.Id)
tikvClient, conn, err := recovery.newTiKVRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
storeId := store.Id
workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("resolved data to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
req := &recovpb.ResolveKvDataRequest{ResolvedTs: resolvedTs}
stream, err := tikvClient.ResolveKvData(ectx, req)
stream, err := recoveryClient.ResolveKvData(ectx, req)
if err != nil {
log.Error("send the resolve kv data failed", zap.Uint64("storeID", storeId))
log.Error("send the resolve kv data failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}
// for a TiKV, received the stream
Expand Down Expand Up @@ -436,12 +434,12 @@ func (recovery *Recovery) makeRecoveryPlan() error {
var ek = prefixEndKey(regions[p.regionId][0].EndKey)
var fk, fv interface{}
fk, _ = topo.Ceiling(sk)
// keysapce overlap sk within ceiling - fk
// keyspace overlap sk within ceiling - fk
if fk != nil && (keyEq(fk.([]byte), sk) || keyCmp(fk.([]byte), ek) < 0) {
continue
}

// keysapce overlap sk within floor - fk.end_key
// keyspace overlap sk within floor - fk.end_key
fk, fv = topo.Floor(sk)
if fk != nil && keyCmp(fv.(RegionEndKey).endKey, sk) > 0 {
continue
Expand Down
33 changes: 16 additions & 17 deletions br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,25 +173,8 @@ func (h *restoreEBSMetaHelper) preRestore(ctx context.Context) error {
}
h.metaInfo = metaInfo

// stop scheduler before recover data
log.Info("starting to remove some PD schedulers")
restoreFunc, e := h.pdc.RemoveAllPDSchedulers(ctx)
if e != nil {
return errors.Trace(err)
}
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, doing clean work with background context")
ctx = context.Background()
}
if restoreE := restoreFunc(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
}()

// todo: check whether target cluster is compatible with the backup
// but cluster hasn't bootstrapped, we cannot get cluster version from pd now.

return nil
}

Expand Down Expand Up @@ -228,6 +211,22 @@ func (h *restoreEBSMetaHelper) restore() error {
return errors.Trace(err)
}

// stop scheduler before recover data
log.Info("starting to remove some PD schedulers")
restoreFunc, e := h.pdc.RemoveAllPDSchedulers(ctx)
if e != nil {
return errors.Trace(err)
}
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, doing clean work with background context")
ctx = context.Background()
}
if restoreE := restoreFunc(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
}()

storeCount := h.metaInfo.GetStoreCount()
progress := h.g.StartProgress(ctx, h.cmdName, int64(storeCount), !h.cfg.LogProgress)
defer progress.Close()
Expand Down

0 comments on commit b174bfb

Please sign in to comment.