Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: update and restore GCLifeTime once when parallel (#220) #224

Merged
merged 3 commits into from
Aug 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 76 additions & 31 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ var (
requiredTiKVVersion = *semver.New("2.1.0")
)

var gcLifeTimeKey struct{}

func init() {
cfg := tidbcfg.GetGlobalConfig()
Expand Down Expand Up @@ -449,6 +448,59 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}
}

type gcLifeTimeManager struct {
runningJobsLock sync.Mutex
runningJobs int
oriGCLifeTime string
}

func newGCLifeTimeManager() *gcLifeTimeManager {
// Default values of three member are enough to initialize this struct
return &gcLifeTimeManager{}
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has not been increased.
// if m.runningJobs > 0, GC life time has been increased.
// m.runningJobs won't be negative(overflow) since index concurrency is relatively small
func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

if m.runningJobs == 0 {
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
if err != nil {
return err
}
m.oriGCLifeTime = oriGCLifeTime
err = increaseGCLifeTime(ctx, db)
if err != nil {
return err
}
}
m.runningJobs += 1
return nil
}

// Pre- and post-condition:
// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed.
// if m.runningJobs > 0, GC life time has not been recovered.
// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob.
func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB, table string) {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

m.runningJobs -= 1
if m.runningJobs == 0 {
err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime)
if err != nil && common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] update tikv_gc_life_time error %v", table, errors.ErrorStack(err))
}
}
}

var gcLifeTimeKey struct{}

func (rc *RestoreController) restoreTables(ctx context.Context) error {
timer := time.Now()
var wg sync.WaitGroup
Expand All @@ -464,11 +516,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db)
if err != nil {
return err
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime)

manager := newGCLifeTimeManager()
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
Expand Down Expand Up @@ -1315,17 +1365,18 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
timer := time.Now()

ori, err := increaseGCLifeTime(ctx, db)
if err != nil {
return nil, errors.Trace(err)
var err error
manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)
if !ok {
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
}

if err = manager.addOneJob(ctx, db); err != nil {
return nil, err
}

// set it back finally
defer func() {
err = UpdateGCLifeTime(ctx, db, ori)
if err != nil && common.ShouldLogError(err) {
common.AppLogger.Errorf("[%s] update tikv_gc_life_time error %v", table, errors.ErrorStack(err))
}
}()
defer manager.removeOneJob(ctx, db, table)

// ADMIN CHECKSUM TABLE <table>,<table> example.
// mysql> admin checksum table test.t;
Expand All @@ -1347,25 +1398,19 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum,
return &cs, nil
}

func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) {
func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
// checksum command usually takes a long time to execute,
// so here need to increase the gcLifeTime for single transaction.
// try to get gcLifeTime from context first.
gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string)
if !ok {
oriGCLifeTime, err = ObtainGCLifeTime(ctx, db)
if err != nil {
return "", err
}
} else {
oriGCLifeTime = gcLifeTime
}

// try to get gcLifeTimeManager from context first.
// DoChecksum has assure this getting action success.
manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager)

var increaseGCLifeTime bool
if oriGCLifeTime != "" {
ori, err := time.ParseDuration(oriGCLifeTime)
if manager.oriGCLifeTime != "" {
ori, err := time.ParseDuration(manager.oriGCLifeTime)
if err != nil {
return "", errors.Trace(err)
return errors.Trace(err)
}
if ori < defaultGCLifeTime {
increaseGCLifeTime = true
Expand All @@ -1377,13 +1422,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string,
if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return "", errors.Trace(err)
return err
}
}

failpoint.Inject("IncreaseGCUpdateDuration", func() {})

return oriGCLifeTime, nil
return nil
}

////////////////////////////////////////////////////////////////
Expand Down