Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: update and restore GCLifeTime once when parallel #220

Merged
merged 14 commits into from
Aug 7, 2019
Merged
83 changes: 56 additions & 27 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ var (
// DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop
var DeliverPauser = common.NewPauser()

var gcLifeTimeKey struct{}

func init() {
cfg := tidbcfg.GetGlobalConfig()
cfg.Log.SlowThreshold = 3000
Expand Down Expand Up @@ -474,6 +472,18 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan
}
}

type gcLifeTimeHelper struct {
increaseGCLock *sync.Mutex
runningJobs *int32
oriGCLifeTime string
}

var (
gcLifeTimeKey struct{}
// in case there're multiple `restoreTables` jobs on one TiDB, one TiDB requires one lock
gcLifeTimeLock sync.Mutex
)

func (rc *RestoreController) restoreTables(ctx context.Context) error {
logTask := log.L().Begin(zap.InfoLevel, "restore all tables data")

Expand All @@ -494,7 +504,13 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
if err != nil {
return err
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime)
var runningJobs int32
helper := gcLifeTimeHelper{
&gcLifeTimeLock,
&runningJobs,
oriGCLifeTime,
}
ctx2 := context.WithValue(ctx, &gcLifeTimeKey, helper)
for i := 0; i < rc.cfg.App.IndexConcurrency; i++ {
go func() {
for task := range taskCh {
Expand Down Expand Up @@ -1344,19 +1360,37 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto
// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) {
ori, err := increaseGCLifeTime(ctx, db)
if err != nil {
return nil, errors.Trace(err)
var err error
helper, ok := ctx.Value(&gcLifeTimeKey).(gcLifeTimeHelper)
if !ok {
return nil, errors.New("No gcLifeTimeHelper found in context, check context initialization")
}

helper.increaseGCLock.Lock()
*(helper.runningJobs) += 1
if *(helper.runningJobs) == 1 {
err = increaseGCLifeTime(ctx, db)
if err != nil {
helper.increaseGCLock.Unlock()
return nil, err
}
}
helper.increaseGCLock.Unlock()

// set it back finally
defer func() {
err := UpdateGCLifeTime(ctx, db, ori)
if err != nil {
query := fmt.Sprintf("UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", ori)
log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed",
zap.String("query", query),
log.ShortError(err),
)
if atomic.AddInt32(helper.runningJobs, -1) == 0 {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
err := UpdateGCLifeTime(ctx, db, helper.oriGCLifeTime)
if err != nil {
query := fmt.Sprintf(
"UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'",
helper.oriGCLifeTime,
)
log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed",
zap.String("query", query),
log.ShortError(err),
)
}
}
}()

Expand All @@ -1382,25 +1416,20 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum,
return &cs, nil
}

func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) {
func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) {
// checksum command usually takes a long time to execute,
// so here need to increase the gcLifeTime for single transaction.
// try to get gcLifeTime from context first.
gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string)
if !ok {
oriGCLifeTime, err = ObtainGCLifeTime(ctx, db)
if err != nil {
return "", err
}
} else {
oriGCLifeTime = gcLifeTime
}

// try to get gcLifeTimeHelper from context first.
// DoChecksum has assure this getting action success.
helper, _ := ctx.Value(&gcLifeTimeKey).(gcLifeTimeHelper)
oriGCLifeTime := helper.oriGCLifeTime

var increaseGCLifeTime bool
if oriGCLifeTime != "" {
ori, err := time.ParseDuration(oriGCLifeTime)
if err != nil {
return "", errors.Trace(err)
return errors.Trace(err)
}
if ori < defaultGCLifeTime {
increaseGCLifeTime = true
Expand All @@ -1412,13 +1441,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string,
if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return "", errors.Trace(err)
return err
}
}

failpoint.Inject("IncreaseGCUpdateDuration", nil)

return oriGCLifeTime, nil
return nil
}

////////////////////////////////////////////////////////////////
Expand Down
74 changes: 70 additions & 4 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ package restore

import (
"context"
"database/sql"
"sync"
"time"

// "encoding/json"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -116,6 +120,21 @@ func (s *restoreSuite) TestErrorSummaries(c *C) {
})
}

func MockDoChecksumCtx(c *C, db *sql.DB) (context.Context) {
ctx := context.Background()
var gcLifeTimeLock sync.Mutex
var runningJobs int32
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
c.Assert(err, IsNil)

helper := gcLifeTimeHelper{
&gcLifeTimeLock, // from restore.go
&runningJobs,
oriGCLifeTime,
}
return context.WithValue(ctx, &gcLifeTimeKey, helper)
}

func (s *restoreSuite) TestDoChecksum(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -136,7 +155,7 @@ func (s *restoreSuite) TestDoChecksum(c *C) {
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := context.Background()
ctx := MockDoChecksumCtx(c, db)
checksum, err := DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Expand All @@ -148,6 +167,53 @@ func (s *restoreSuite) TestDoChecksum(c *C) {
})
}

func (s *restoreSuite) TestDoChecksumParallel(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()

mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m"))
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("100h0m0s").
WillReturnResult(sqlmock.NewResult(1, 1))
for i := 0; i < 5; i++ {
mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E").
WillDelayFor(100 * time.Millisecond).
WillReturnRows(
sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}).
AddRow("test", "t", 8520875019404689597, 7296873, 357601387),
)
}
mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E").
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))
mock.ExpectClose()

ctx := MockDoChecksumCtx(c, db)

var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
checksum, err := DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Schema: "test",
Table: "t",
Checksum: 8520875019404689597,
TotalKVs: 7296873,
TotalBytes: 357601387,
})
wg.Done()
}()
}
wg.Wait()
db.Close()
err = mock.ExpectationsWereMet()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
}

func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand All @@ -161,7 +227,7 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
WithArgs("300h").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
ctx := MockDoChecksumCtx(c, db)
_, err = DoChecksum(ctx, db, "`test`.`t`")
c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*")
c.Assert(mock.ExpectationsWereMet(), IsNil)
Expand Down Expand Up @@ -379,7 +445,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))

ctx := context.Background()
ctx := MockDoChecksumCtx(c, db)
err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890))
c.Assert(err, IsNil)

Expand All @@ -406,7 +472,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) {
WithArgs("10m").
WillReturnResult(sqlmock.NewResult(2, 1))

ctx := context.Background()
ctx := MockDoChecksumCtx(c, db)
err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680))
c.Assert(err, ErrorMatches, "checksum mismatched.*")

Expand Down