Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: update and restore GCLifeTime once when parallel #220

Merged
merged 14 commits into from
Aug 7, 2019
Merged
118 changes: 83 additions & 35 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ var (
// DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop
var DeliverPauser = common.NewPauser()

var gcLifeTimeKey struct{}

func init() {
cfg := tidbcfg.GetGlobalConfig()
cfg.Log.SlowThreshold = 3000
Expand Down Expand Up @@ -474,6 +472,66 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}
}

type gcLifeTimeManager struct {
runningJobsLock *sync.Mutex
runningJobs *int32
oriGCLifeTime *string
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

func newManager() gcLifeTimeManager {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
var (
gcLifeTimeLock sync.Mutex
runningJobs int32
oriGCLifeTime string
)
return gcLifeTimeManager{
&gcLifeTimeLock,
&runningJobs,
&oriGCLifeTime,
}
}

func (m gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

*m.runningJobs += 1
if *m.runningJobs == 1 {
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
if err != nil {
return err
zier-one marked this conversation as resolved.
Show resolved Hide resolved
}
*m.oriGCLifeTime = oriGCLifeTime
err = increaseGCLifeTime(ctx, db)
if err != nil {
return err
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
}

func (m gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) {
m.runningJobsLock.Lock()
defer m.runningJobsLock.Unlock()

*m.runningJobs -= 1
if *m.runningJobs == 0 {
err := UpdateGCLifeTime(ctx, db, *m.oriGCLifeTime)
if err != nil {
query := fmt.Sprintf(
"UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'",
*m.oriGCLifeTime,
)
log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed",
zap.String("query", query),
log.ShortError(err),
)
}
}
}

var gcLifeTimeKey struct{}

func (rc *RestoreController) restoreTables(ctx context.Context) error {
logTask := log.L().Begin(zap.InfoLevel, "restore all tables data")

Expand All @@ -490,11 +548,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}
taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
defer close(taskCh)
oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db)
if err != nil {
return err
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime)

manager := newManager()
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
Expand Down Expand Up @@ -546,7 +602,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
wg.Wait()
stopPeriodicActions <- struct{}{}

err = restoreErr.Get()
err := restoreErr.Get()
logTask.End(zap.ErrorLevel, err)
return err
}
Expand Down Expand Up @@ -1344,21 +1400,18 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto
// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
ori, err := increaseGCLifeTime(ctx, db)
if err != nil {
return nil, errors.Trace(err)
var err error
manager, ok := ctx.Value(&gcLifeTimeKey).(gcLifeTimeManager)
if !ok {
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
}

if err = manager.addOneJob(ctx, db); err != nil {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

// set it back finally
defer func() {
err := UpdateGCLifeTime(ctx, db, ori)
if err != nil {
query := fmt.Sprintf("UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", ori)
log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed",
zap.String("query", query),
log.ShortError(err),
)
}
}()
defer manager.removeOneJob(ctx, db)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

task := log.With(zap.String("table", table)).Begin(zap.InfoLevel, "remote checksum")

Expand All @@ -1382,25 +1435,20 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum,
return &cs, nil
}

func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) {
func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
// checksum command usually takes a long time to execute,
// so here need to increase the gcLifeTime for single transaction.
// try to get gcLifeTime from context first.
gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string)
if !ok {
oriGCLifeTime, err = ObtainGCLifeTime(ctx, db)
if err != nil {
return "", err
}
} else {
oriGCLifeTime = gcLifeTime
}

// try to get gcLifeTimeManager from context first.
// DoChecksum has assure this getting action success.
manager, _ := ctx.Value(&gcLifeTimeKey).(gcLifeTimeManager)
oriGCLifeTime := *manager.oriGCLifeTime

var increaseGCLifeTime bool
if oriGCLifeTime != "" {
ori, err := time.ParseDuration(oriGCLifeTime)
if err != nil {
return "", errors.Trace(err)
return errors.Trace(err)
}
if ori < defaultGCLifeTime {
increaseGCLifeTime = true
Expand All @@ -1412,13 +1460,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string,
if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return "", errors.Trace(err)
return err
}
}

failpoint.Inject("IncreaseGCUpdateDuration", nil)

return oriGCLifeTime, nil
return nil
}

////////////////////////////////////////////////////////////////
Expand Down
72 changes: 68 additions & 4 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package restore

import (
"context"
"sync"
"time"

// "encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -116,6 +119,20 @@ func (s *restoreSuite) TestErrorSummaries(c *C) {
})
}

func MockDoChecksumCtx() context.Context {
ctx := context.Background()
var gcLifeTimeLock sync.Mutex
var runningJobs int32
var oriGCLifeTime string

helper := gcLifeTimeManager{
&gcLifeTimeLock,
&runningJobs,
&oriGCLifeTime,
}
return context.WithValue(ctx, &gcLifeTimeKey, helper)
}

func (s *restoreSuite) TestDoChecksum(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -136,7 +153,7 @@ func (s *restoreSuite) TestDoChecksum(c *C) {
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := context.Background()
ctx := MockDoChecksumCtx()
checksum, err := DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Expand All @@ -148,6 +165,53 @@ func (s *restoreSuite) TestDoChecksum(c *C) {
})
}

func (s *restoreSuite) TestDoChecksumParallel(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m"))
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("100h0m0s").
WillReturnResult(sqlmock.NewResult(1, 1))
for i := 0; i < 5; i++ {
mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E").
WillDelayFor(100 * time.Millisecond).
WillReturnRows(
sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}).
AddRow("test", "t", 8520875019404689597, 7296873, 357601387),
)
}
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx()

var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
checksum, err := DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Schema: "test",
Table: "t",
Checksum: 8520875019404689597,
TotalKVs: 7296873,
TotalBytes: 357601387,
})
wg.Done()
}()
}
wg.Wait()
db.Close()
err = mock.ExpectationsWereMet()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
}

func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -161,7 +225,7 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
ctx := MockDoChecksumCtx()
_, err = DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*")
c.Assert(mock.ExpectationsWereMet(), IsNil)
Expand Down Expand Up @@ -379,7 +443,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))

ctx := context.Background()
ctx := MockDoChecksumCtx()
err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890))
c.Assert(err, IsNil)

Expand All @@ -406,7 +470,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))

ctx := context.Background()
ctx := MockDoChecksumCtx()
err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680))
c.Assert(err, ErrorMatches, "checksum mismatched.*")

Expand Down