diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 665c6171b..02d60ea53 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -77,8 +77,6 @@ var ( // DeliverPauser is a shared pauser to pause progress to (*chunkRestore).encodeLoop var DeliverPauser = common.NewPauser() -var gcLifeTimeKey struct{} - func init() { cfg := tidbcfg.GetGlobalConfig() cfg.Log.SlowThreshold = 3000 @@ -474,6 +472,66 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan } } +type gcLifeTimeManager struct { + runningJobsLock sync.Mutex + runningJobs int + oriGCLifeTime string +} + +func newGCLifeTimeManager() *gcLifeTimeManager { + // Default values of three member are enough to initialize this struct + return &gcLifeTimeManager{} +} + +// Pre- and post-condition: +// if m.runningJobs == 0, GC life time has not been increased. +// if m.runningJobs > 0, GC life time has been increased. +// m.runningJobs won't be negative(overflow) since index concurrency is relatively small +func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { + m.runningJobsLock.Lock() + defer m.runningJobsLock.Unlock() + + if m.runningJobs == 0 { + oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) + if err != nil { + return err + } + m.oriGCLifeTime = oriGCLifeTime + err = increaseGCLifeTime(ctx, db) + if err != nil { + return err + } + } + m.runningJobs += 1 + return nil +} + +// Pre- and post-condition: +// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed. +// if m.runningJobs > 0, GC life time has not been recovered. +// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob. +func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { + m.runningJobsLock.Lock() + defer m.runningJobsLock.Unlock() + + m.runningJobs -= 1 + if m.runningJobs == 0 { + err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime) + if err != nil { + query := fmt.Sprintf( + "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", + m.oriGCLifeTime, + ) + log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", + zap.String("query", query), + log.ShortError(err), + ) + } + } +} + +var gcLifeTimeKey struct{} + func (rc *RestoreController) restoreTables(ctx context.Context) error { logTask := log.L().Begin(zap.InfoLevel, "restore all tables data") @@ -490,11 +548,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { } taskCh := make(chan task, rc.cfg.App.IndexConcurrency) defer close(taskCh) - oriGCLifeTime, err := ObtainGCLifeTime(ctx, rc.tidbMgr.db) - if err != nil { - return err - } - ctx2 := context.WithValue(ctx, &gcLifeTimeKey, oriGCLifeTime) + + manager := newGCLifeTimeManager() + ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager) for i := 0; i < rc.cfg.App.IndexConcurrency; i++ { go func() { for task := range taskCh { @@ -546,7 +602,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { wg.Wait() stopPeriodicActions <- struct{}{} - err = restoreErr.Get() + err := restoreErr.Get() logTask.End(zap.ErrorLevel, err) return err } @@ -1344,21 +1400,18 @@ func setSessionConcurrencyVars(ctx context.Context, db *sql.DB, dsn config.DBSto // DoChecksum do checksum for tables. // table should be in ., format. e.g. foo.bar func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) { - ori, err := increaseGCLifeTime(ctx, db) - if err != nil { - return nil, errors.Trace(err) + var err error + manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager) + if !ok { + return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") + } + + if err = manager.addOneJob(ctx, db); err != nil { + return nil, err } + // set it back finally - defer func() { - err := UpdateGCLifeTime(ctx, db, ori) - if err != nil { - query := fmt.Sprintf("UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", ori) - log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", - zap.String("query", query), - log.ShortError(err), - ) - } - }() + defer manager.removeOneJob(ctx, db) task := log.With(zap.String("table", table)).Begin(zap.InfoLevel, "remote checksum") @@ -1382,25 +1435,19 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, return &cs, nil } -func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) { +func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { // checksum command usually takes a long time to execute, // so here need to increase the gcLifeTime for single transaction. - // try to get gcLifeTime from context first. - gcLifeTime, ok := ctx.Value(&gcLifeTimeKey).(string) - if !ok { - oriGCLifeTime, err = ObtainGCLifeTime(ctx, db) - if err != nil { - return "", err - } - } else { - oriGCLifeTime = gcLifeTime - } + + // try to get gcLifeTimeManager from context first. + // DoChecksum has assure this getting action success. + manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager) var increaseGCLifeTime bool - if oriGCLifeTime != "" { - ori, err := time.ParseDuration(oriGCLifeTime) + if manager.oriGCLifeTime != "" { + ori, err := time.ParseDuration(manager.oriGCLifeTime) if err != nil { - return "", errors.Trace(err) + return errors.Trace(err) } if ori < defaultGCLifeTime { increaseGCLifeTime = true @@ -1412,13 +1459,13 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, if increaseGCLifeTime { err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String()) if err != nil { - return "", errors.Trace(err) + return err } } failpoint.Inject("IncreaseGCUpdateDuration", nil) - return oriGCLifeTime, nil + return nil } //////////////////////////////////////////////////////////////// diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index abc7025cb..563a33144 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -15,6 +15,9 @@ package restore import ( "context" + "sync" + "time" + // "encoding/json" "fmt" "io/ioutil" @@ -116,10 +119,15 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { }) } +func MockDoChecksumCtx() context.Context { + ctx := context.Background() + manager := newGCLifeTimeManager() + return context.WithValue(ctx, &gcLifeTimeKey, manager) +} + func (s *restoreSuite) TestDoChecksum(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -136,7 +144,7 @@ func (s *restoreSuite) TestDoChecksum(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := context.Background() + ctx := MockDoChecksumCtx() checksum, err := DoChecksum(ctx, db, "`test`.`t`") c.Assert(err, IsNil) c.Assert(*checksum, DeepEquals, RemoteChecksum{ @@ -146,12 +154,98 @@ func (s *restoreSuite) TestDoChecksum(c *C) { TotalKVs: 7296873, TotalBytes: 357601387, }) + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *restoreSuite) TestDoChecksumParallel(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnResult(sqlmock.NewResult(1, 1)) + for i := 0; i < 5; i++ { + mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). + WillDelayFor(100 * time.Millisecond). + WillReturnRows( + sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}). + AddRow("test", "t", 8520875019404689597, 7296873, 357601387), + ) + } + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() + + ctx := MockDoChecksumCtx() + + // db.Close() will close all connections from its idle pool, set it 1 to expect one close + db.SetMaxIdleConns(1) + var wg sync.WaitGroup + wg.Add(5) + for i := 0; i < 5; i++ { + go func() { + defer wg.Done() + checksum, err := DoChecksum(ctx, db, "`test`.`t`") + c.Assert(err, IsNil) + c.Assert(*checksum, DeepEquals, RemoteChecksum{ + Schema: "test", + Table: "t", + Checksum: 8520875019404689597, + TotalKVs: 7296873, + TotalBytes: 357601387, + }) + }() + } + wg.Wait() + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + for i := 0; i < 5; i++ { + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnError(errors.Annotate(context.Canceled, "update gc error")) + } + // This recover GC Life Time SQL should not be executed in DoChecksum + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() + + ctx := MockDoChecksumCtx() + var wg sync.WaitGroup + wg.Add(5) + for i := 0; i < 5; i++ { + go func() { + _, err = DoChecksum(ctx, db, "`test`.`t`") + c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled") + wg.Done() + }() + } + wg.Wait() + + _, err = db.Exec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E", "10m") + c.Assert(err, IsNil) + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) } func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("300h")) @@ -160,18 +254,19 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). WithArgs("300h"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() - ctx := context.Background() + ctx := MockDoChecksumCtx() _, err = DoChecksum(ctx, db, "`test`.`t`") c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*") + + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectExec( `SET\s+`+ @@ -181,6 +276,7 @@ func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) { `SESSION tidb_checksum_table_concurrency = \?`). WithArgs(123, 456, 789, 543). WillReturnResult(sqlmock.NewResult(1, 4)) + mock.ExpectClose() ctx := context.Background() setSessionConcurrencyVars(ctx, db, config.DBStore{ @@ -190,8 +286,8 @@ func (s *restoreSuite) TestSetSessionConcurrencyVars(c *C) { ChecksumTableConcurrency: 543, }) + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } var _ = Suite(&tableRestoreSuite{}) @@ -363,7 +459,6 @@ func (s *tableRestoreSuite) TestInitializeColumns(c *C) { func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("SELECT.*tikv_gc_life_time.*"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -378,19 +473,20 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { mock.ExpectExec("UPDATE.*tikv_gc_life_time.*"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() - ctx := context.Background() + ctx := MockDoChecksumCtx() err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890)) c.Assert(err, IsNil) + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() + } func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectQuery("SELECT.*tikv_gc_life_time.*"). WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) @@ -405,29 +501,30 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) { mock.ExpectExec("UPDATE.*tikv_gc_life_time.*"). WithArgs("10m"). WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() - ctx := context.Background() + ctx := MockDoChecksumCtx() err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680)) c.Assert(err, ErrorMatches, "checksum mismatched.*") + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *tableRestoreSuite) TestAnalyzeTable(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - defer db.Close() mock.ExpectExec("ANALYZE TABLE `db`\\.`table`"). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() ctx := context.Background() err = s.tr.analyzeTable(ctx, db) c.Assert(err, IsNil) + c.Assert(db.Close(), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) - mock.ExpectClose() } func (s *tableRestoreSuite) TestImportKVSuccess(c *C) {