From dc17151fe7ad2191891ad0f1ff1aa094f986521b Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 9 Sep 2020 10:34:17 +0800 Subject: [PATCH 1/8] use gc ttl for checksum --- lightning/backend/importer.go | 8 +- lightning/checkpoints/tidb.go | 1 + lightning/common/version.go | 13 ++ lightning/restore/restore.go | 290 ++++++++++++++++++++++++++++-- lightning/restore/restore_test.go | 2 +- lightning/restore/tidb.go | 1 + 6 files changed, 288 insertions(+), 27 deletions(-) diff --git a/lightning/backend/importer.go b/lightning/backend/importer.go index 174d06ef2..9f05999ad 100644 --- a/lightning/backend/importer.go +++ b/lightning/backend/importer.go @@ -292,13 +292,7 @@ func checkTiDBVersion(tls *common.TLS, requiredVersion semver.Version) error { } func checkPDVersion(tls *common.TLS, pdAddr string, requiredVersion semver.Version) error { - var rawVersion string - err := tls.WithHost(pdAddr).GetJSON("/pd/api/v1/config/cluster-version", &rawVersion) - if err != nil { - return err - } - - version, err := semver.NewVersion(rawVersion) + version, err := common.FetchPDVersion(tls, pdAddr) if err != nil { return errors.Trace(err) } diff --git a/lightning/checkpoints/tidb.go b/lightning/checkpoints/tidb.go index 8b98c45be..f53cadd42 100644 --- a/lightning/checkpoints/tidb.go +++ b/lightning/checkpoints/tidb.go @@ -24,6 +24,7 @@ type TidbDBInfo struct { type TidbTableInfo struct { ID int64 + DB string Name string Core *model.TableInfo } diff --git a/lightning/common/version.go b/lightning/common/version.go index a883f3f02..fad832e14 100644 --- a/lightning/common/version.go +++ b/lightning/common/version.go @@ -16,6 +16,8 @@ package common import ( "fmt" + "github.com/coreos/go-semver/semver" + "github.com/pingcap/errors" "go.uber.org/zap" "github.com/pingcap/tidb-lightning/lightning/log" @@ -58,3 +60,14 @@ func PrintInfo(app string, callback func()) { callback() } } + +// FetchPDVersion get pd version +func FetchPDVersion(tls *TLS, pdAddr string) (*semver.Version, error) { + var rawVersion string + err := tls.WithHost(pdAddr).GetJSON("/pd/api/v1/config/cluster-version", &rawVersion) + if err != nil { + return nil, errors.Trace(err) + } + + return semver.NewVersion(rawVersion) +} diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 033a4c35b..524821ef2 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -14,6 +14,7 @@ package restore import ( + "container/heap" "context" "database/sql" "fmt" @@ -24,6 +25,14 @@ import ( "sync/atomic" "time" + "github.com/pingcap/tidb/store/tikv" + + "github.com/pingcap/br/pkg/checksum" + + "github.com/pingcap/tidb/store/tikv/oracle" + + pd "github.com/tikv/pd/client" + "github.com/pingcap/br/pkg/storage" "github.com/pingcap/errors" @@ -31,6 +40,7 @@ import ( sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/parser/model" tidbcfg "github.com/pingcap/tidb/config" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -59,7 +69,9 @@ const ( ) const ( - indexEngineID = -1 + indexEngineID = -1 + preUpdateServiceSafePointFactor = 3 + serviceSafePointTTL = 10 * 60 // 10 min in seconds ) const ( @@ -151,6 +163,7 @@ type RestoreController struct { closedEngineLimit *worker.Pool store storage.ExternalStorage + checksumManager ChecksumManager } func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config, s storage.ExternalStorage) (*RestoreController, error) { @@ -649,7 +662,7 @@ func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { } } -var gcLifeTimeKey struct{} +var checksumManagerKey struct{} func (rc *RestoreController) restoreTables(ctx context.Context) error { logTask := log.L().Begin(zap.InfoLevel, "restore all tables data") @@ -668,8 +681,11 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { taskCh := make(chan task, rc.cfg.App.IndexConcurrency) defer close(taskCh) - manager := newGCLifeTimeManager() - ctx2 := context.WithValue(ctx, &gcLifeTimeKey, manager) + manager, err := rc.newChecksumManager() + if err != nil { + return errors.Trace(err) + } + ctx2 := context.WithValue(ctx, &checksumManagerKey, manager) for i := 0; i < rc.cfg.App.IndexConcurrency; i++ { go func() { for task := range taskCh { @@ -793,11 +809,47 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { wg.Wait() close(stopPeriodicActions) - err := restoreErr.Get() + err = restoreErr.Get() logTask.End(zap.ErrorLevel, err) return err } +func (rc *RestoreController) newChecksumManager() (ChecksumManager, error) { + pdAddr := rc.cfg.TiDB.PdAddr + pdVersion, err := common.FetchPDVersion(rc.tls, pdAddr) + if err != nil { + return nil, errors.Trace(err) + } + + // for v4.0.0 or update, we can use + var manager ChecksumManager + if pdVersion.Major >= 4 { + tlsOpt := rc.tls.ToPDSecurityOption() + pdCli, err := pd.NewClient([]string{pdAddr}, tlsOpt) + if err != nil { + return nil, errors.Trace(err) + } + + if tlsOpt.CAPath != "" { + conf := tidbcfg.GetGlobalConfig() + conf.Security.ClusterSSLCA = tlsOpt.CAPath + conf.Security.ClusterSSLCert = tlsOpt.CertPath + conf.Security.ClusterSSLKey = tlsOpt.KeyPath + tidbcfg.StoreGlobalConfig(conf) + } + store, err := tikv.Driver{}.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddr)) + if err != nil { + return nil, errors.Trace(err) + } + + manager = newTiKVChecksumManager(store.(tikv.Storage).GetClient(), pdCli) + } else { + manager = newTiDBChecksumExecutor(rc.tidbMgr.db) + } + + return manager, nil +} + func (t *TableRestore) restoreTable( ctx context.Context, rc *RestoreController, @@ -1567,7 +1619,7 @@ func (tr *TableRestore) importKV(ctx context.Context, closedEngine *kv.ClosedEng // do checksum for each table. func (tr *TableRestore) compareChecksum(ctx context.Context, db *sql.DB, localChecksum verify.KVChecksum) error { - remoteChecksum, err := DoChecksum(ctx, db, tr.tableName) + remoteChecksum, err := DoChecksum(ctx, db, tr.tableInfo) if err != nil { return errors.Trace(err) } @@ -1603,23 +1655,35 @@ type RemoteChecksum struct { TotalBytes uint64 } -// DoChecksum do checksum for tables. -// table should be in ., format. e.g. foo.bar -func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, error) { - var err error - manager, ok := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager) - if !ok { - return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") +type ChecksumManager interface { + Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) +} + +// fetch checksum for tidb sql client +type tidbChecksumExecutor struct { + db *sql.DB + manager *gcLifeTimeManager +} + +func newTiDBChecksumExecutor(db *sql.DB) *tidbChecksumExecutor { + return &tidbChecksumExecutor{ + db: db, + manager: newGCLifeTimeManager(), } +} - if err = manager.addOneJob(ctx, db); err != nil { +func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { + var err error + if err = e.manager.addOneJob(ctx, e.db); err != nil { return nil, err } // set it back finally - defer manager.removeOneJob(ctx, db) + defer e.manager.removeOneJob(ctx, e.db) + + tableName := common.UniqueTable(tableInfo.DB, tableInfo.Name) - task := log.With(zap.String("table", table)).Begin(zap.InfoLevel, "remote checksum") + task := log.With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum") // ADMIN CHECKSUM TABLE
,
example. // mysql> admin checksum table test.t; @@ -1630,8 +1694,8 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, // +---------+------------+---------------------+-----------+-------------+ cs := RemoteChecksum{} - err = common.SQLWithRetry{DB: db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", - "ADMIN CHECKSUM TABLE "+table, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes, + err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", + "ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes, ) dur := task.End(zap.ErrorLevel, err) metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) @@ -1641,13 +1705,31 @@ func DoChecksum(ctx context.Context, db *sql.DB, table string) (*RemoteChecksum, return &cs, nil } +// DoChecksum do checksum for tables. +// table should be in .
, format. e.g. foo.bar +func DoChecksum(ctx context.Context, db *sql.DB, table *TidbTableInfo) (*RemoteChecksum, error) { + var err error + manager, ok := ctx.Value(&checksumManagerKey).(ChecksumManager) + if !ok { + return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") + } + + task := log.With(zap.String("table", table.Name)).Begin(zap.InfoLevel, "remote checksum") + + cs, err := manager.Checksum(ctx, table) + dur := task.End(zap.ErrorLevel, err) + metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) + + return cs, err +} + func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { // checksum command usually takes a long time to execute, // so here need to increase the gcLifeTime for single transaction. // try to get gcLifeTimeManager from context first. // DoChecksum has assure this getting action success. - manager, _ := ctx.Value(&gcLifeTimeKey).(*gcLifeTimeManager) + manager, _ := ctx.Value(&checksumManagerKey).(*gcLifeTimeManager) var increaseGCLifeTime bool if manager.oriGCLifeTime != "" { @@ -1674,6 +1756,176 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { return nil } +type tikvChecksumManager struct { + client tidbkv.Client + manager gcTTLManager +} + +// newTiKVChecksumManager return a new tikv checksum manager +func newTiKVChecksumManager(client tidbkv.Client, pdClient pd.Client) *tikvChecksumManager { + return &tikvChecksumManager{ + client: client, + manager: gcTTLManager{ + pdClient: pdClient, + }, + } +} + +func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { + builder := checksum.NewExecutorBuilder(tableInfo.Core, 0) + executor, err := builder.Build() + if err != nil { + return nil, errors.Trace(err) + } + + res, err := executor.Execute(ctx, e.client, func() {}) + if err != nil { + return nil, errors.Trace(err) + } + + return &RemoteChecksum{ + Schema: tableInfo.DB, + Table: tableInfo.Name, + Checksum: res.Checksum, + TotalBytes: res.TotalBytes, + TotalKVs: res.TotalKvs, + }, nil +} + +func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { + tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) + err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0)) + if err != nil { + return nil, errors.Trace(err) + } + + return e.checksumDB(ctx, tableInfo) +} + +type tableChecksumTS struct { + table string + gcSafeTS uint64 +} + +// following function are for implement `heap.Interface` + +func (m *gcTTLManager) Len() int { + return len(m.tableGCSafeTS) +} + +func (m *gcTTLManager) Less(i, j int) bool { + return m.tableGCSafeTS[i].gcSafeTS < m.tableGCSafeTS[j].gcSafeTS +} + +func (m *gcTTLManager) Swap(i, j int) { + m.tableGCSafeTS[i], m.tableGCSafeTS[j] = m.tableGCSafeTS[j], m.tableGCSafeTS[i] +} + +func (m *gcTTLManager) Push(x interface{}) { + m.tableGCSafeTS = append(m.tableGCSafeTS, x.(*tableChecksumTS)) +} + +func (m *gcTTLManager) Pop() interface{} { + i := m.tableGCSafeTS[len(m.tableGCSafeTS)-1] + m.tableGCSafeTS = m.tableGCSafeTS[:len(m.tableGCSafeTS)-1] + return i +} + +type gcTTLManager struct { + lock sync.Mutex + pdClient pd.Client + // tableGCSafeTS is a binary heap that stored active checksum jobs GC safe point ts + tableGCSafeTS []*tableChecksumTS + currentTs uint64 + serviceID string +} + +func (m *gcTTLManager) addOneJob(ctx context.Context, table string, ts uint64) error { + m.lock.Lock() + defer m.lock.Unlock() + var curTs uint64 + if len(m.tableGCSafeTS) > 0 { + curTs = m.tableGCSafeTS[0].gcSafeTS + } + m.Push(&tableChecksumTS{table: table, gcSafeTS: ts}) + heap.Fix(m, len(m.tableGCSafeTS)-1) + m.currentTs = m.tableGCSafeTS[0].gcSafeTS + if curTs == 0 || m.currentTs < curTs { + return m.doUpdateGCTTL(ctx, m.currentTs) + } + return nil +} + +func (m *gcTTLManager) removeOneJob(table string) { + m.lock.Lock() + defer m.lock.Unlock() + idx := -1 + for i := 0; i < len(m.tableGCSafeTS); i++ { + if m.tableGCSafeTS[i].table == table { + idx = i + break + } + } + + if idx >= 0 { + l := len(m.tableGCSafeTS) + m.tableGCSafeTS[idx] = m.tableGCSafeTS[l-1] + m.tableGCSafeTS = m.tableGCSafeTS[:l-1] + if l > 1 && idx < l-1 { + heap.Fix(m, idx) + } + } + + var newTs uint64 + if len(m.tableGCSafeTS) > 0 { + newTs = m.tableGCSafeTS[0].gcSafeTS + } + m.currentTs = newTs +} + +func (m *gcTTLManager) updateGCTTL(ctx context.Context) error { + m.lock.Lock() + currentTs := m.currentTs + m.lock.Unlock() + return m.doUpdateGCTTL(ctx, currentTs) +} + +func (m *gcTTLManager) doUpdateGCTTL(ctx context.Context, ts uint64) error { + log.L().Debug("update PD safePoint limit with TTL", + zap.Uint64("currnet_ts", ts)) + var err error + if ts > 0 { + _, err = m.pdClient.UpdateServiceGCSafePoint(ctx, + m.serviceID, serviceSafePointTTL, ts) + } + return err +} + +func (m *gcTTLManager) start(ctx context.Context) { + // It would be OK since TTL won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(serviceSafePointTTL) * time.Second / preUpdateServiceSafePointFactor + + updateTick := time.NewTicker(updateGapTime) + + _ = m.updateGCTTL(ctx) + go func() { + defer updateTick.Stop() + for { + select { + case <-ctx.Done(): + log.L().Info("service safe point keeper exited") + return + case <-updateTick.C: + if err := m.updateGCTTL(ctx); err != nil { + log.L().Warn("failed to update service safe point, backup may fail if gc triggered", + zap.Error(err), + ) + } + } + } + }() +} + //////////////////////////////////////////////////////////////// var ( diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 9bf8cda11..040c03dad 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -124,7 +124,7 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { func MockDoChecksumCtx() context.Context { ctx := context.Background() manager := newGCLifeTimeManager() - return context.WithValue(ctx, &gcLifeTimeKey, manager) + return context.WithValue(ctx, &checksumManagerKey, manager) } func (s *restoreSuite) TestDoChecksum(c *C) { diff --git a/lightning/restore/tidb.go b/lightning/restore/tidb.go index e2f07a149..41da4e306 100644 --- a/lightning/restore/tidb.go +++ b/lightning/restore/tidb.go @@ -222,6 +222,7 @@ func (timgr *TiDBManager) LoadSchemaInfo( } tableInfo := &TidbTableInfo{ ID: tbl.ID, + DB: schema.Name, Name: tableName, Core: tbl, } From 47a2c314d36f03192a6c41bb558b98445d42fe11 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 11 Sep 2020 16:30:01 +0800 Subject: [PATCH 2/8] fix snapshot ts --- lightning/restore/restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 524821ef2..4ec331c83 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1772,7 +1772,7 @@ func newTiKVChecksumManager(client tidbkv.Client, pdClient pd.Client) *tikvCheck } func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { - builder := checksum.NewExecutorBuilder(tableInfo.Core, 0) + builder := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(time.Now().Unix()*1000, 0)) executor, err := builder.Build() if err != nil { return nil, errors.Trace(err) From 56dd47a5897ebfcc0030d8a68a709f10b9e6311c Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 11 Sep 2020 16:58:30 +0800 Subject: [PATCH 3/8] resolve a comment --- lightning/restore/restore.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 4ec331c83..fc6fe81a9 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -25,16 +25,8 @@ import ( "sync/atomic" "time" - "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/br/pkg/checksum" - - "github.com/pingcap/tidb/store/tikv/oracle" - - pd "github.com/tikv/pd/client" - "github.com/pingcap/br/pkg/storage" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -42,8 +34,11 @@ import ( tidbcfg "github.com/pingcap/tidb/config" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + pd "github.com/tikv/pd/client" "go.uber.org/zap" "modernc.org/mathutil" @@ -821,7 +816,7 @@ func (rc *RestoreController) newChecksumManager() (ChecksumManager, error) { return nil, errors.Trace(err) } - // for v4.0.0 or update, we can use + // for v4.0.0 or upper, we can use the gc ttl api var manager ChecksumManager if pdVersion.Major >= 4 { tlsOpt := rc.tls.ToPDSecurityOption() From d889655774bb8b6f140544ab1d5b6e957ae963de Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 11 Sep 2020 17:43:06 +0800 Subject: [PATCH 4/8] fix unit test --- lightning/restore/restore.go | 9 ++------- lightning/restore/restore_test.go | 27 ++++++++++++++------------- lightning/restore/tidb_test.go | 2 ++ 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index fc6fe81a9..7dedac831 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -624,7 +624,7 @@ func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { return err } m.oriGCLifeTime = oriGCLifeTime - err = increaseGCLifeTime(ctx, db) + err = increaseGCLifeTime(ctx, m, db) if err != nil { return err } @@ -1718,14 +1718,9 @@ func DoChecksum(ctx context.Context, db *sql.DB, table *TidbTableInfo) (*RemoteC return cs, err } -func increaseGCLifeTime(ctx context.Context, db *sql.DB) (err error) { +func increaseGCLifeTime(ctx context.Context, manager *gcLifeTimeManager, db *sql.DB) (err error) { // checksum command usually takes a long time to execute, // so here need to increase the gcLifeTime for single transaction. - - // try to get gcLifeTimeManager from context first. - // DoChecksum has assure this getting action success. - manager, _ := ctx.Value(&checksumManagerKey).(*gcLifeTimeManager) - var increaseGCLifeTime bool if manager.oriGCLifeTime != "" { ori, err := time.ParseDuration(manager.oriGCLifeTime) diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 040c03dad..e10b318ca 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -15,6 +15,7 @@ package restore import ( "context" + "database/sql" "fmt" "io/ioutil" "path/filepath" @@ -121,9 +122,9 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { }) } -func MockDoChecksumCtx() context.Context { +func MockDoChecksumCtx(db *sql.DB) context.Context { ctx := context.Background() - manager := newGCLifeTimeManager() + manager := newTiDBChecksumExecutor(db) return context.WithValue(ctx, &checksumManagerKey, manager) } @@ -146,8 +147,8 @@ func (s *restoreSuite) TestDoChecksum(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx() - checksum, err := DoChecksum(ctx, db, "`test`.`t`") + ctx := MockDoChecksumCtx(db) + checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) c.Assert(err, IsNil) c.Assert(*checksum, DeepEquals, RemoteChecksum{ Schema: "test", @@ -183,7 +184,7 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx() + ctx := MockDoChecksumCtx(db) // db.Close() will close all connections from its idle pool, set it 1 to expect one close db.SetMaxIdleConns(1) @@ -192,7 +193,7 @@ func (s *restoreSuite) TestDoChecksumParallel(c *C) { for i := 0; i < 5; i++ { go func() { defer wg.Done() - checksum, err := DoChecksum(ctx, db, "`test`.`t`") + checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) c.Assert(err, IsNil) c.Assert(*checksum, DeepEquals, RemoteChecksum{ Schema: "test", @@ -226,12 +227,12 @@ func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx() + ctx := MockDoChecksumCtx(db) var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { go func() { - _, err = DoChecksum(ctx, db, "`test`.`t`") + _, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled") wg.Done() }() @@ -258,8 +259,8 @@ func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx() - _, err = DoChecksum(ctx, db, "`test`.`t`") + ctx := MockDoChecksumCtx(db) + _, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*") c.Assert(db.Close(), IsNil) @@ -370,7 +371,7 @@ func (s *tableRestoreSuiteBase) SetUpSuite(c *C) { c.Assert(err, IsNil) core.State = model.StatePublic - s.tableInfo = &TidbTableInfo{Name: "table", Core: core} + s.tableInfo = &TidbTableInfo{Name: "table", DB: "db", Core: core} s.dbInfo = &TidbDBInfo{ Name: "db", Tables: map[string]*TidbTableInfo{"table": s.tableInfo}, @@ -590,7 +591,7 @@ func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx() + ctx := MockDoChecksumCtx(db) err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(1234567, 12345, 1234567890)) c.Assert(err, IsNil) @@ -618,7 +619,7 @@ func (s *tableRestoreSuite) TestCompareChecksumFailure(c *C) { WillReturnResult(sqlmock.NewResult(2, 1)) mock.ExpectClose() - ctx := MockDoChecksumCtx() + ctx := MockDoChecksumCtx(db) err = s.tr.compareChecksum(ctx, db, verification.MakeKVChecksum(9876543, 54321, 1357924680)) c.Assert(err, ErrorMatches, "checksum mismatched.*") diff --git a/lightning/restore/tidb_test.go b/lightning/restore/tidb_test.go index 6428d19c2..81f0e09e0 100644 --- a/lightning/restore/tidb_test.go +++ b/lightning/restore/tidb_test.go @@ -251,11 +251,13 @@ func (s *tidbSuite) TestLoadSchemaInfo(c *C) { Tables: map[string]*checkpoints.TidbTableInfo{ "t1": { ID: 100, + DB: "db", Name: "t1", Core: tableInfos[0], }, "t2": { ID: 101, + DB: "db", Name: "t2", Core: tableInfos[1], }, From a3b156b9b9ba20da2fe7ecc33af1242956bb196e Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 23 Oct 2020 14:52:08 +0800 Subject: [PATCH 5/8] resolve comments --- lightning/restore/restore.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 4dc23d681..25d29fb6c 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1966,7 +1966,14 @@ func (m *gcTTLManager) start(ctx context.Context) { updateTick := time.NewTicker(updateGapTime) - _ = m.updateGCTTL(ctx) + updateGCTTL := func() { + if err := m.updateGCTTL(ctx); err != nil { + log.L().Warn("failed to update service safe point, checksum may fail if gc triggered", zap.Error(err)) + } + } + + // trigger a service gc ttl at start + updateGCTTL() go func() { defer updateTick.Stop() for { @@ -1975,11 +1982,7 @@ func (m *gcTTLManager) start(ctx context.Context) { log.L().Info("service safe point keeper exited") return case <-updateTick.C: - if err := m.updateGCTTL(ctx); err != nil { - log.L().Warn("failed to update service safe point, backup may fail if gc triggered", - zap.Error(err), - ) - } + updateGCTTL() } } }() From 012729af9524fba3a796e7c78a10fb575f56401a Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 23 Oct 2020 17:10:22 +0800 Subject: [PATCH 6/8] add a unit test --- lightning/restore/restore_test.go | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 303fc056d..8c6576fc9 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -23,6 +23,8 @@ import ( "sync" "time" + pd "github.com/tikv/pd/client" + "github.com/DATA-DOG/go-sqlmock" "github.com/golang/mock/gomock" "github.com/pingcap/br/pkg/storage" @@ -1000,3 +1002,41 @@ func (s *chunkRestoreSuite) TestRestore(c *C) { c.Assert(err, IsNil) c.Assert(saveCpCh, HasLen, 2) } + +type testPDClient struct { + pd.Client +} + +func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return 0, nil +} + +type gcTTLManagerSuite struct{} + +var _ = Suite(&gcTTLManagerSuite{}) + +func (s *gcTTLManagerSuite) TestGcTTLManager(c *C) { + manager := gcTTLManager{pdClient: &testPDClient{}} + ctx := context.Background() + + for i := uint64(1); i <= 5; i++ { + err := manager.addOneJob(ctx, fmt.Sprintf("test%d", i), i) + c.Assert(err, IsNil) + c.Assert(manager.currentTs, Equals, uint64(1)) + } + + manager.removeOneJob("test2") + c.Assert(manager.currentTs, Equals, uint64(1)) + + manager.removeOneJob("test1") + c.Assert(manager.currentTs, Equals, uint64(3)) + + manager.removeOneJob("test3") + c.Assert(manager.currentTs, Equals, uint64(4)) + + manager.removeOneJob("test4") + c.Assert(manager.currentTs, Equals, uint64(5)) + + manager.removeOneJob("test5") + c.Assert(manager.currentTs, Equals, uint64(0)) +} From 8212c2492e9eb48eaac22f42473a062ecb89e2f1 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 23 Oct 2020 19:32:19 +0800 Subject: [PATCH 7/8] split checksum into a separated file and fix comment --- lightning/restore/checksum.go | 360 +++++++++++++++++++++++++++++ lightning/restore/checksum_test.go | 200 ++++++++++++++++ lightning/restore/restore.go | 345 +-------------------------- lightning/restore/restore_test.go | 188 --------------- 4 files changed, 565 insertions(+), 528 deletions(-) create mode 100644 lightning/restore/checksum.go create mode 100644 lightning/restore/checksum_test.go diff --git a/lightning/restore/checksum.go b/lightning/restore/checksum.go new file mode 100644 index 000000000..968c599df --- /dev/null +++ b/lightning/restore/checksum.go @@ -0,0 +1,360 @@ +package restore + +import ( + "container/heap" + "context" + "database/sql" + "fmt" + "sync" + "time" + + "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/oracle" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" + + . "github.com/pingcap/tidb-lightning/lightning/checkpoints" + "github.com/pingcap/tidb-lightning/lightning/common" + "github.com/pingcap/tidb-lightning/lightning/log" + "github.com/pingcap/tidb-lightning/lightning/metric" +) + +// RemoteChecksum represents a checksum result got from tidb. +type RemoteChecksum struct { + Schema string + Table string + Checksum uint64 + TotalKVs uint64 + TotalBytes uint64 +} + +type ChecksumManager interface { + Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) +} + +// fetch checksum for tidb sql client +type tidbChecksumExecutor struct { + db *sql.DB + manager *gcLifeTimeManager +} + +func newTiDBChecksumExecutor(db *sql.DB) *tidbChecksumExecutor { + return &tidbChecksumExecutor{ + db: db, + manager: newGCLifeTimeManager(), + } +} + +func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { + var err error + if err = e.manager.addOneJob(ctx, e.db); err != nil { + return nil, err + } + + // set it back finally + defer e.manager.removeOneJob(ctx, e.db) + + tableName := common.UniqueTable(tableInfo.DB, tableInfo.Name) + + task := log.With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum") + + // ADMIN CHECKSUM TABLE
,
example. + // mysql> admin checksum table test.t; + // +---------+------------+---------------------+-----------+-------------+ + // | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes | + // +---------+------------+---------------------+-----------+-------------+ + // | test | t | 8520875019404689597 | 7296873 | 357601387 | + // +---------+------------+---------------------+-----------+-------------+ + + cs := RemoteChecksum{} + err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", + "ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes, + ) + dur := task.End(zap.ErrorLevel, err) + metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) + if err != nil { + return nil, errors.Trace(err) + } + return &cs, nil +} + +// DoChecksum do checksum for tables. +// table should be in .
, format. e.g. foo.bar +func DoChecksum(ctx context.Context, db *sql.DB, table *TidbTableInfo) (*RemoteChecksum, error) { + var err error + manager, ok := ctx.Value(&checksumManagerKey).(ChecksumManager) + if !ok { + return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") + } + + task := log.With(zap.String("table", table.Name)).Begin(zap.InfoLevel, "remote checksum") + + cs, err := manager.Checksum(ctx, table) + dur := task.End(zap.ErrorLevel, err) + metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) + + return cs, err +} + +type gcLifeTimeManager struct { + runningJobsLock sync.Mutex + runningJobs int + oriGCLifeTime string +} + +func newGCLifeTimeManager() *gcLifeTimeManager { + // Default values of three member are enough to initialize this struct + return &gcLifeTimeManager{} +} + +// Pre- and post-condition: +// if m.runningJobs == 0, GC life time has not been increased. +// if m.runningJobs > 0, GC life time has been increased. +// m.runningJobs won't be negative(overflow) since index concurrency is relatively small +func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { + m.runningJobsLock.Lock() + defer m.runningJobsLock.Unlock() + + if m.runningJobs == 0 { + oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) + if err != nil { + return err + } + m.oriGCLifeTime = oriGCLifeTime + err = increaseGCLifeTime(ctx, m, db) + if err != nil { + return err + } + } + m.runningJobs += 1 + return nil +} + +// Pre- and post-condition: +// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed. +// if m.runningJobs > 0, GC life time has not been recovered. +// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob. +func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { + m.runningJobsLock.Lock() + defer m.runningJobsLock.Unlock() + + m.runningJobs -= 1 + if m.runningJobs == 0 { + err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime) + if err != nil { + query := fmt.Sprintf( + "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", + m.oriGCLifeTime, + ) + log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", + zap.String("query", query), + log.ShortError(err), + ) + } + } +} + +func increaseGCLifeTime(ctx context.Context, manager *gcLifeTimeManager, db *sql.DB) (err error) { + // checksum command usually takes a long time to execute, + // so here need to increase the gcLifeTime for single transaction. + var increaseGCLifeTime bool + if manager.oriGCLifeTime != "" { + ori, err := time.ParseDuration(manager.oriGCLifeTime) + if err != nil { + return errors.Trace(err) + } + if ori < defaultGCLifeTime { + increaseGCLifeTime = true + } + } else { + increaseGCLifeTime = true + } + + if increaseGCLifeTime { + err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String()) + if err != nil { + return err + } + } + + failpoint.Inject("IncreaseGCUpdateDuration", nil) + + return nil +} + +type tikvChecksumManager struct { + client kv.Client + manager gcTTLManager +} + +// newTiKVChecksumManager return a new tikv checksum manager +func newTiKVChecksumManager(client kv.Client, pdClient pd.Client) *tikvChecksumManager { + return &tikvChecksumManager{ + client: client, + manager: gcTTLManager{ + pdClient: pdClient, + }, + } +} + +func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { + builder := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(time.Now().Unix()*1000, 0)) + executor, err := builder.Build() + if err != nil { + return nil, errors.Trace(err) + } + + res, err := executor.Execute(ctx, e.client, func() {}) + if err != nil { + return nil, errors.Trace(err) + } + + return &RemoteChecksum{ + Schema: tableInfo.DB, + Table: tableInfo.Name, + Checksum: res.Checksum, + TotalBytes: res.TotalBytes, + TotalKVs: res.TotalKvs, + }, nil +} + +func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { + tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) + err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0)) + if err != nil { + return nil, errors.Trace(err) + } + + return e.checksumDB(ctx, tableInfo) +} + +type tableChecksumTS struct { + table string + gcSafeTS uint64 +} + +// following function are for implement `heap.Interface` + +func (m *gcTTLManager) Len() int { + return len(m.tableGCSafeTS) +} + +func (m *gcTTLManager) Less(i, j int) bool { + return m.tableGCSafeTS[i].gcSafeTS < m.tableGCSafeTS[j].gcSafeTS +} + +func (m *gcTTLManager) Swap(i, j int) { + m.tableGCSafeTS[i], m.tableGCSafeTS[j] = m.tableGCSafeTS[j], m.tableGCSafeTS[i] +} + +func (m *gcTTLManager) Push(x interface{}) { + m.tableGCSafeTS = append(m.tableGCSafeTS, x.(*tableChecksumTS)) +} + +func (m *gcTTLManager) Pop() interface{} { + i := m.tableGCSafeTS[len(m.tableGCSafeTS)-1] + m.tableGCSafeTS = m.tableGCSafeTS[:len(m.tableGCSafeTS)-1] + return i +} + +type gcTTLManager struct { + lock sync.Mutex + pdClient pd.Client + // tableGCSafeTS is a binary heap that stored active checksum jobs GC safe point ts + tableGCSafeTS []*tableChecksumTS + currentTs uint64 + serviceID string +} + +func (m *gcTTLManager) addOneJob(ctx context.Context, table string, ts uint64) error { + m.lock.Lock() + defer m.lock.Unlock() + var curTs uint64 + if len(m.tableGCSafeTS) > 0 { + curTs = m.tableGCSafeTS[0].gcSafeTS + } + m.Push(&tableChecksumTS{table: table, gcSafeTS: ts}) + heap.Fix(m, len(m.tableGCSafeTS)-1) + m.currentTs = m.tableGCSafeTS[0].gcSafeTS + if curTs == 0 || m.currentTs < curTs { + return m.doUpdateGCTTL(ctx, m.currentTs) + } + return nil +} + +func (m *gcTTLManager) removeOneJob(table string) { + m.lock.Lock() + defer m.lock.Unlock() + idx := -1 + for i := 0; i < len(m.tableGCSafeTS); i++ { + if m.tableGCSafeTS[i].table == table { + idx = i + break + } + } + + if idx >= 0 { + l := len(m.tableGCSafeTS) + m.tableGCSafeTS[idx] = m.tableGCSafeTS[l-1] + m.tableGCSafeTS = m.tableGCSafeTS[:l-1] + if l > 1 && idx < l-1 { + heap.Fix(m, idx) + } + } + + var newTs uint64 + if len(m.tableGCSafeTS) > 0 { + newTs = m.tableGCSafeTS[0].gcSafeTS + } + m.currentTs = newTs +} + +func (m *gcTTLManager) updateGCTTL(ctx context.Context) error { + m.lock.Lock() + currentTs := m.currentTs + m.lock.Unlock() + return m.doUpdateGCTTL(ctx, currentTs) +} + +func (m *gcTTLManager) doUpdateGCTTL(ctx context.Context, ts uint64) error { + log.L().Debug("update PD safePoint limit with TTL", + zap.Uint64("currnet_ts", ts)) + var err error + if ts > 0 { + _, err = m.pdClient.UpdateServiceGCSafePoint(ctx, + m.serviceID, serviceSafePointTTL, ts) + } + return err +} + +func (m *gcTTLManager) start(ctx context.Context) { + // It would be OK since TTL won't be zero, so gapTime should > `0. + updateGapTime := time.Duration(serviceSafePointTTL) * time.Second / preUpdateServiceSafePointFactor + + updateTick := time.NewTicker(updateGapTime) + + updateGCTTL := func() { + if err := m.updateGCTTL(ctx); err != nil { + log.L().Warn("failed to update service safe point, checksum may fail if gc triggered", zap.Error(err)) + } + } + + // trigger a service gc ttl at start + updateGCTTL() + go func() { + defer updateTick.Stop() + for { + select { + case <-ctx.Done(): + log.L().Info("service safe point keeper exited") + return + case <-updateTick.C: + updateGCTTL() + } + } + }() +} diff --git a/lightning/restore/checksum_test.go b/lightning/restore/checksum_test.go new file mode 100644 index 000000000..ba1ae234d --- /dev/null +++ b/lightning/restore/checksum_test.go @@ -0,0 +1,200 @@ +package restore + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + pd "github.com/tikv/pd/client" + + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" + "github.com/pingcap/errors" + + . "github.com/pingcap/tidb-lightning/lightning/checkpoints" +) + +var _ = Suite(&checksumSuite{}) + +type checksumSuite struct{} + +func MockDoChecksumCtx(db *sql.DB) context.Context { + ctx := context.Background() + manager := newTiDBChecksumExecutor(db) + return context.WithValue(ctx, &checksumManagerKey, manager) +} + +func (s *checksumSuite) TestDoChecksum(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). + WillReturnRows( + sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}). + AddRow("test", "t", 8520875019404689597, 7296873, 357601387), + ) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() + + ctx := MockDoChecksumCtx(db) + checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) + c.Assert(err, IsNil) + c.Assert(*checksum, DeepEquals, RemoteChecksum{ + Schema: "test", + Table: "t", + Checksum: 8520875019404689597, + TotalKVs: 7296873, + TotalBytes: 357601387, + }) + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *checksumSuite) TestDoChecksumParallel(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnResult(sqlmock.NewResult(1, 1)) + for i := 0; i < 5; i++ { + mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). + WillDelayFor(100 * time.Millisecond). + WillReturnRows( + sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}). + AddRow("test", "t", 8520875019404689597, 7296873, 357601387), + ) + } + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(2, 1)) + mock.ExpectClose() + + ctx := MockDoChecksumCtx(db) + + // db.Close() will close all connections from its idle pool, set it 1 to expect one close + db.SetMaxIdleConns(1) + var wg sync.WaitGroup + wg.Add(5) + for i := 0; i < 5; i++ { + go func() { + defer wg.Done() + checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) + c.Assert(err, IsNil) + c.Assert(*checksum, DeepEquals, RemoteChecksum{ + Schema: "test", + Table: "t", + Checksum: 8520875019404689597, + TotalKVs: 7296873, + TotalBytes: 357601387, + }) + }() + } + wg.Wait() + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *checksumSuite) TestIncreaseGCLifeTimeFail(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + for i := 0; i < 5; i++ { + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("100h0m0s"). + WillReturnError(errors.Annotate(context.Canceled, "update gc error")) + } + // This recover GC Life Time SQL should not be executed in DoChecksum + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("10m"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() + + ctx := MockDoChecksumCtx(db) + var wg sync.WaitGroup + wg.Add(5) + for i := 0; i < 5; i++ { + go func() { + _, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) + c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled") + wg.Done() + }() + } + wg.Wait() + + _, err = db.Exec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E", "10m") + c.Assert(err, IsNil) + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("300h")) + mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). + WillReturnError(errors.Annotate(context.Canceled, "mock syntax error")) + mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). + WithArgs("300h"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectClose() + + ctx := MockDoChecksumCtx(db) + _, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) + c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*") + + c.Assert(db.Close(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +type testPDClient struct { + pd.Client +} + +func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return 0, nil +} + +func (s *checksumSuite) TestGcTTLManager(c *C) { + manager := gcTTLManager{pdClient: &testPDClient{}} + ctx := context.Background() + + for i := uint64(1); i <= 5; i++ { + err := manager.addOneJob(ctx, fmt.Sprintf("test%d", i), i) + c.Assert(err, IsNil) + c.Assert(manager.currentTs, Equals, uint64(1)) + } + + manager.removeOneJob("test2") + c.Assert(manager.currentTs, Equals, uint64(1)) + + manager.removeOneJob("test1") + c.Assert(manager.currentTs, Equals, uint64(3)) + + manager.removeOneJob("test3") + c.Assert(manager.currentTs, Equals, uint64(4)) + + manager.removeOneJob("test4") + c.Assert(manager.currentTs, Equals, uint64(5)) + + manager.removeOneJob("test5") + c.Assert(manager.currentTs, Equals, uint64(0)) +} diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 0f93df895..c9b80407d 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -14,7 +14,6 @@ package restore import ( - "container/heap" "context" "database/sql" "fmt" @@ -26,7 +25,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/br/pkg/checksum" "github.com/pingcap/br/pkg/pdutil" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/errors" @@ -34,10 +32,8 @@ import ( sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/parser/model" tidbcfg "github.com/pingcap/tidb/config" - tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/collate" @@ -642,64 +638,6 @@ func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan } } -type gcLifeTimeManager struct { - runningJobsLock sync.Mutex - runningJobs int - oriGCLifeTime string -} - -func newGCLifeTimeManager() *gcLifeTimeManager { - // Default values of three member are enough to initialize this struct - return &gcLifeTimeManager{} -} - -// Pre- and post-condition: -// if m.runningJobs == 0, GC life time has not been increased. -// if m.runningJobs > 0, GC life time has been increased. -// m.runningJobs won't be negative(overflow) since index concurrency is relatively small -func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error { - m.runningJobsLock.Lock() - defer m.runningJobsLock.Unlock() - - if m.runningJobs == 0 { - oriGCLifeTime, err := ObtainGCLifeTime(ctx, db) - if err != nil { - return err - } - m.oriGCLifeTime = oriGCLifeTime - err = increaseGCLifeTime(ctx, m, db) - if err != nil { - return err - } - } - m.runningJobs += 1 - return nil -} - -// Pre- and post-condition: -// if m.runningJobs == 0, GC life time has been tried to recovered. If this try fails, a warning will be printed. -// if m.runningJobs > 0, GC life time has not been recovered. -// m.runningJobs won't minus to negative since removeOneJob follows a successful addOneJob. -func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) { - m.runningJobsLock.Lock() - defer m.runningJobsLock.Unlock() - - m.runningJobs -= 1 - if m.runningJobs == 0 { - err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime) - if err != nil { - query := fmt.Sprintf( - "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", - m.oriGCLifeTime, - ) - log.L().Warn("revert GC lifetime failed, please reset the GC lifetime manually after Lightning completed", - zap.String("query", query), - log.ShortError(err), - ) - } - } -} - var checksumManagerKey struct{} func (rc *RestoreController) restoreTables(ctx context.Context) error { @@ -876,6 +814,11 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { } func (rc *RestoreController) newChecksumManager() (ChecksumManager, error) { + // if we don't need checksum, just return nil + if rc.cfg.PostRestore.Checksum == config.OpLevelOff { + return nil, nil + } + pdAddr := rc.cfg.TiDB.PdAddr pdVersion, err := common.FetchPDVersion(rc.tls, pdAddr) if err != nil { @@ -1735,284 +1678,6 @@ func (tr *TableRestore) analyzeTable(ctx context.Context, db *sql.DB) error { return err } -// RemoteChecksum represents a checksum result got from tidb. -type RemoteChecksum struct { - Schema string - Table string - Checksum uint64 - TotalKVs uint64 - TotalBytes uint64 -} - -type ChecksumManager interface { - Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) -} - -// fetch checksum for tidb sql client -type tidbChecksumExecutor struct { - db *sql.DB - manager *gcLifeTimeManager -} - -func newTiDBChecksumExecutor(db *sql.DB) *tidbChecksumExecutor { - return &tidbChecksumExecutor{ - db: db, - manager: newGCLifeTimeManager(), - } -} - -func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { - var err error - if err = e.manager.addOneJob(ctx, e.db); err != nil { - return nil, err - } - - // set it back finally - defer e.manager.removeOneJob(ctx, e.db) - - tableName := common.UniqueTable(tableInfo.DB, tableInfo.Name) - - task := log.With(zap.String("table", tableName)).Begin(zap.InfoLevel, "remote checksum") - - // ADMIN CHECKSUM TABLE
,
example. - // mysql> admin checksum table test.t; - // +---------+------------+---------------------+-----------+-------------+ - // | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes | - // +---------+------------+---------------------+-----------+-------------+ - // | test | t | 8520875019404689597 | 7296873 | 357601387 | - // +---------+------------+---------------------+-----------+-------------+ - - cs := RemoteChecksum{} - err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum", - "ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes, - ) - dur := task.End(zap.ErrorLevel, err) - metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) - if err != nil { - return nil, errors.Trace(err) - } - return &cs, nil -} - -// DoChecksum do checksum for tables. -// table should be in .
, format. e.g. foo.bar -func DoChecksum(ctx context.Context, db *sql.DB, table *TidbTableInfo) (*RemoteChecksum, error) { - var err error - manager, ok := ctx.Value(&checksumManagerKey).(ChecksumManager) - if !ok { - return nil, errors.New("No gcLifeTimeManager found in context, check context initialization") - } - - task := log.With(zap.String("table", table.Name)).Begin(zap.InfoLevel, "remote checksum") - - cs, err := manager.Checksum(ctx, table) - dur := task.End(zap.ErrorLevel, err) - metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) - - return cs, err -} - -func increaseGCLifeTime(ctx context.Context, manager *gcLifeTimeManager, db *sql.DB) (err error) { - // checksum command usually takes a long time to execute, - // so here need to increase the gcLifeTime for single transaction. - var increaseGCLifeTime bool - if manager.oriGCLifeTime != "" { - ori, err := time.ParseDuration(manager.oriGCLifeTime) - if err != nil { - return errors.Trace(err) - } - if ori < defaultGCLifeTime { - increaseGCLifeTime = true - } - } else { - increaseGCLifeTime = true - } - - if increaseGCLifeTime { - err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String()) - if err != nil { - return err - } - } - - failpoint.Inject("IncreaseGCUpdateDuration", nil) - - return nil -} - -type tikvChecksumManager struct { - client tidbkv.Client - manager gcTTLManager -} - -// newTiKVChecksumManager return a new tikv checksum manager -func newTiKVChecksumManager(client tidbkv.Client, pdClient pd.Client) *tikvChecksumManager { - return &tikvChecksumManager{ - client: client, - manager: gcTTLManager{ - pdClient: pdClient, - }, - } -} - -func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { - builder := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(time.Now().Unix()*1000, 0)) - executor, err := builder.Build() - if err != nil { - return nil, errors.Trace(err) - } - - res, err := executor.Execute(ctx, e.client, func() {}) - if err != nil { - return nil, errors.Trace(err) - } - - return &RemoteChecksum{ - Schema: tableInfo.DB, - Table: tableInfo.Name, - Checksum: res.Checksum, - TotalBytes: res.TotalBytes, - TotalKVs: res.TotalKvs, - }, nil -} - -func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { - tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0)) - if err != nil { - return nil, errors.Trace(err) - } - - return e.checksumDB(ctx, tableInfo) -} - -type tableChecksumTS struct { - table string - gcSafeTS uint64 -} - -// following function are for implement `heap.Interface` - -func (m *gcTTLManager) Len() int { - return len(m.tableGCSafeTS) -} - -func (m *gcTTLManager) Less(i, j int) bool { - return m.tableGCSafeTS[i].gcSafeTS < m.tableGCSafeTS[j].gcSafeTS -} - -func (m *gcTTLManager) Swap(i, j int) { - m.tableGCSafeTS[i], m.tableGCSafeTS[j] = m.tableGCSafeTS[j], m.tableGCSafeTS[i] -} - -func (m *gcTTLManager) Push(x interface{}) { - m.tableGCSafeTS = append(m.tableGCSafeTS, x.(*tableChecksumTS)) -} - -func (m *gcTTLManager) Pop() interface{} { - i := m.tableGCSafeTS[len(m.tableGCSafeTS)-1] - m.tableGCSafeTS = m.tableGCSafeTS[:len(m.tableGCSafeTS)-1] - return i -} - -type gcTTLManager struct { - lock sync.Mutex - pdClient pd.Client - // tableGCSafeTS is a binary heap that stored active checksum jobs GC safe point ts - tableGCSafeTS []*tableChecksumTS - currentTs uint64 - serviceID string -} - -func (m *gcTTLManager) addOneJob(ctx context.Context, table string, ts uint64) error { - m.lock.Lock() - defer m.lock.Unlock() - var curTs uint64 - if len(m.tableGCSafeTS) > 0 { - curTs = m.tableGCSafeTS[0].gcSafeTS - } - m.Push(&tableChecksumTS{table: table, gcSafeTS: ts}) - heap.Fix(m, len(m.tableGCSafeTS)-1) - m.currentTs = m.tableGCSafeTS[0].gcSafeTS - if curTs == 0 || m.currentTs < curTs { - return m.doUpdateGCTTL(ctx, m.currentTs) - } - return nil -} - -func (m *gcTTLManager) removeOneJob(table string) { - m.lock.Lock() - defer m.lock.Unlock() - idx := -1 - for i := 0; i < len(m.tableGCSafeTS); i++ { - if m.tableGCSafeTS[i].table == table { - idx = i - break - } - } - - if idx >= 0 { - l := len(m.tableGCSafeTS) - m.tableGCSafeTS[idx] = m.tableGCSafeTS[l-1] - m.tableGCSafeTS = m.tableGCSafeTS[:l-1] - if l > 1 && idx < l-1 { - heap.Fix(m, idx) - } - } - - var newTs uint64 - if len(m.tableGCSafeTS) > 0 { - newTs = m.tableGCSafeTS[0].gcSafeTS - } - m.currentTs = newTs -} - -func (m *gcTTLManager) updateGCTTL(ctx context.Context) error { - m.lock.Lock() - currentTs := m.currentTs - m.lock.Unlock() - return m.doUpdateGCTTL(ctx, currentTs) -} - -func (m *gcTTLManager) doUpdateGCTTL(ctx context.Context, ts uint64) error { - log.L().Debug("update PD safePoint limit with TTL", - zap.Uint64("currnet_ts", ts)) - var err error - if ts > 0 { - _, err = m.pdClient.UpdateServiceGCSafePoint(ctx, - m.serviceID, serviceSafePointTTL, ts) - } - return err -} - -func (m *gcTTLManager) start(ctx context.Context) { - // It would be OK since TTL won't be zero, so gapTime should > `0. - updateGapTime := time.Duration(serviceSafePointTTL) * time.Second / preUpdateServiceSafePointFactor - - updateTick := time.NewTicker(updateGapTime) - - updateGCTTL := func() { - if err := m.updateGCTTL(ctx); err != nil { - log.L().Warn("failed to update service safe point, checksum may fail if gc triggered", zap.Error(err)) - } - } - - // trigger a service gc ttl at start - updateGCTTL() - go func() { - defer updateTick.Stop() - for { - select { - case <-ctx.Done(): - log.L().Info("service safe point keeper exited") - return - case <-updateTick.C: - updateGCTTL() - } - } - }() -} - //////////////////////////////////////////////////////////////// var ( diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index 8c6576fc9..bef2beb45 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -15,15 +15,10 @@ package restore import ( "context" - "database/sql" "fmt" "io/ioutil" "path/filepath" "sort" - "sync" - "time" - - pd "github.com/tikv/pd/client" "github.com/DATA-DOG/go-sqlmock" "github.com/golang/mock/gomock" @@ -124,151 +119,6 @@ func (s *restoreSuite) TestErrorSummaries(c *C) { }) } -func MockDoChecksumCtx(db *sql.DB) context.Context { - ctx := context.Background() - manager := newTiDBChecksumExecutor(db) - return context.WithValue(ctx, &checksumManagerKey, manager) -} - -func (s *restoreSuite) TestDoChecksum(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - - mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("100h0m0s"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). - WillReturnRows( - sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}). - AddRow("test", "t", 8520875019404689597, 7296873, 357601387), - ) - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("10m"). - WillReturnResult(sqlmock.NewResult(2, 1)) - mock.ExpectClose() - - ctx := MockDoChecksumCtx(db) - checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) - c.Assert(err, IsNil) - c.Assert(*checksum, DeepEquals, RemoteChecksum{ - Schema: "test", - Table: "t", - Checksum: 8520875019404689597, - TotalKVs: 7296873, - TotalBytes: 357601387, - }) - - c.Assert(db.Close(), IsNil) - c.Assert(mock.ExpectationsWereMet(), IsNil) -} - -func (s *restoreSuite) TestDoChecksumParallel(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - - mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("100h0m0s"). - WillReturnResult(sqlmock.NewResult(1, 1)) - for i := 0; i < 5; i++ { - mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). - WillDelayFor(100 * time.Millisecond). - WillReturnRows( - sqlmock.NewRows([]string{"Db_name", "Table_name", "Checksum_crc64_xor", "Total_kvs", "Total_bytes"}). - AddRow("test", "t", 8520875019404689597, 7296873, 357601387), - ) - } - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("10m"). - WillReturnResult(sqlmock.NewResult(2, 1)) - mock.ExpectClose() - - ctx := MockDoChecksumCtx(db) - - // db.Close() will close all connections from its idle pool, set it 1 to expect one close - db.SetMaxIdleConns(1) - var wg sync.WaitGroup - wg.Add(5) - for i := 0; i < 5; i++ { - go func() { - defer wg.Done() - checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) - c.Assert(err, IsNil) - c.Assert(*checksum, DeepEquals, RemoteChecksum{ - Schema: "test", - Table: "t", - Checksum: 8520875019404689597, - TotalKVs: 7296873, - TotalBytes: 357601387, - }) - }() - } - wg.Wait() - - c.Assert(db.Close(), IsNil) - c.Assert(mock.ExpectationsWereMet(), IsNil) -} - -func (s *restoreSuite) TestIncreaseGCLifeTimeFail(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - - for i := 0; i < 5; i++ { - mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("10m")) - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("100h0m0s"). - WillReturnError(errors.Annotate(context.Canceled, "update gc error")) - } - // This recover GC Life Time SQL should not be executed in DoChecksum - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("10m"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectClose() - - ctx := MockDoChecksumCtx(db) - var wg sync.WaitGroup - wg.Add(5) - for i := 0; i < 5; i++ { - go func() { - _, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) - c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled") - wg.Done() - }() - } - wg.Wait() - - _, err = db.Exec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E", "10m") - c.Assert(err, IsNil) - - c.Assert(db.Close(), IsNil) - c.Assert(mock.ExpectationsWereMet(), IsNil) -} - -func (s *restoreSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - - mock.ExpectQuery("\\QSELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WillReturnRows(sqlmock.NewRows([]string{"VARIABLE_VALUE"}).AddRow("300h")) - mock.ExpectQuery("\\QADMIN CHECKSUM TABLE `test`.`t`\\E"). - WillReturnError(errors.Annotate(context.Canceled, "mock syntax error")) - mock.ExpectExec("\\QUPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'\\E"). - WithArgs("300h"). - WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectClose() - - ctx := MockDoChecksumCtx(db) - _, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"}) - c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*") - - c.Assert(db.Close(), IsNil) - c.Assert(mock.ExpectationsWereMet(), IsNil) -} - func (s *restoreSuite) TestVerifyCheckpoint(c *C) { dir := c.MkDir() cpdb := checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb")) @@ -1002,41 +852,3 @@ func (s *chunkRestoreSuite) TestRestore(c *C) { c.Assert(err, IsNil) c.Assert(saveCpCh, HasLen, 2) } - -type testPDClient struct { - pd.Client -} - -func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - return 0, nil -} - -type gcTTLManagerSuite struct{} - -var _ = Suite(&gcTTLManagerSuite{}) - -func (s *gcTTLManagerSuite) TestGcTTLManager(c *C) { - manager := gcTTLManager{pdClient: &testPDClient{}} - ctx := context.Background() - - for i := uint64(1); i <= 5; i++ { - err := manager.addOneJob(ctx, fmt.Sprintf("test%d", i), i) - c.Assert(err, IsNil) - c.Assert(manager.currentTs, Equals, uint64(1)) - } - - manager.removeOneJob("test2") - c.Assert(manager.currentTs, Equals, uint64(1)) - - manager.removeOneJob("test1") - c.Assert(manager.currentTs, Equals, uint64(3)) - - manager.removeOneJob("test3") - c.Assert(manager.currentTs, Equals, uint64(4)) - - manager.removeOneJob("test4") - c.Assert(manager.currentTs, Equals, uint64(5)) - - manager.removeOneJob("test5") - c.Assert(manager.currentTs, Equals, uint64(0)) -} From a4146585cc47f1b633fb550cec30e1943c043cc3 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 23 Oct 2020 19:37:58 +0800 Subject: [PATCH 8/8] udpate --- lightning/restore/checksum.go | 54 +++++++++++++++++++++++++++++++++-- lightning/restore/restore.go | 49 ++----------------------------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/lightning/restore/checksum.go b/lightning/restore/checksum.go index 968c599df..466e359ae 100644 --- a/lightning/restore/checksum.go +++ b/lightning/restore/checksum.go @@ -9,20 +9,27 @@ import ( "time" "github.com/pingcap/br/pkg/checksum" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" + tidbcfg "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" . "github.com/pingcap/tidb-lightning/lightning/checkpoints" "github.com/pingcap/tidb-lightning/lightning/common" + "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/log" "github.com/pingcap/tidb-lightning/lightning/metric" ) +const ( + preUpdateServiceSafePointFactor = 3 + serviceSafePointTTL = 10 * 60 // 10 min in seconds +) + // RemoteChecksum represents a checksum result got from tidb. type RemoteChecksum struct { Schema string @@ -36,6 +43,47 @@ type ChecksumManager interface { Checksum(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) } +func newChecksumManager(rc *RestoreController) (ChecksumManager, error) { + // if we don't need checksum, just return nil + if rc.cfg.PostRestore.Checksum == config.OpLevelOff { + return nil, nil + } + + pdAddr := rc.cfg.TiDB.PdAddr + pdVersion, err := common.FetchPDVersion(rc.tls, pdAddr) + if err != nil { + return nil, errors.Trace(err) + } + + // for v4.0.0 or upper, we can use the gc ttl api + var manager ChecksumManager + if pdVersion.Major >= 4 { + tlsOpt := rc.tls.ToPDSecurityOption() + pdCli, err := pd.NewClient([]string{pdAddr}, tlsOpt) + if err != nil { + return nil, errors.Trace(err) + } + + if tlsOpt.CAPath != "" { + conf := tidbcfg.GetGlobalConfig() + conf.Security.ClusterSSLCA = tlsOpt.CAPath + conf.Security.ClusterSSLCert = tlsOpt.CertPath + conf.Security.ClusterSSLKey = tlsOpt.KeyPath + tidbcfg.StoreGlobalConfig(conf) + } + store, err := tikv.Driver{}.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddr)) + if err != nil { + return nil, errors.Trace(err) + } + + manager = newTiKVChecksumManager(store.(tikv.Storage).GetClient(), pdCli) + } else { + manager = newTiDBChecksumExecutor(rc.tidbMgr.db) + } + + return manager, nil +} + // fetch checksum for tidb sql client type tidbChecksumExecutor struct { db *sql.DB diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index c9b80407d..cbd63354e 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -33,11 +33,9 @@ import ( "github.com/pingcap/parser/model" tidbcfg "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/collate" - pd "github.com/tikv/pd/client" "go.uber.org/zap" "modernc.org/mathutil" @@ -63,9 +61,7 @@ const ( ) const ( - indexEngineID = -1 - preUpdateServiceSafePointFactor = 3 - serviceSafePointTTL = 10 * 60 // 10 min in seconds + indexEngineID = -1 ) const ( @@ -680,7 +676,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { taskCh := make(chan task, rc.cfg.App.IndexConcurrency) defer close(taskCh) - manager, err := rc.newChecksumManager() + manager, err := newChecksumManager(rc) if err != nil { return errors.Trace(err) } @@ -813,47 +809,6 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { return err } -func (rc *RestoreController) newChecksumManager() (ChecksumManager, error) { - // if we don't need checksum, just return nil - if rc.cfg.PostRestore.Checksum == config.OpLevelOff { - return nil, nil - } - - pdAddr := rc.cfg.TiDB.PdAddr - pdVersion, err := common.FetchPDVersion(rc.tls, pdAddr) - if err != nil { - return nil, errors.Trace(err) - } - - // for v4.0.0 or upper, we can use the gc ttl api - var manager ChecksumManager - if pdVersion.Major >= 4 { - tlsOpt := rc.tls.ToPDSecurityOption() - pdCli, err := pd.NewClient([]string{pdAddr}, tlsOpt) - if err != nil { - return nil, errors.Trace(err) - } - - if tlsOpt.CAPath != "" { - conf := tidbcfg.GetGlobalConfig() - conf.Security.ClusterSSLCA = tlsOpt.CAPath - conf.Security.ClusterSSLCert = tlsOpt.CertPath - conf.Security.ClusterSSLKey = tlsOpt.KeyPath - tidbcfg.StoreGlobalConfig(conf) - } - store, err := tikv.Driver{}.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddr)) - if err != nil { - return nil, errors.Trace(err) - } - - manager = newTiKVChecksumManager(store.(tikv.Storage).GetClient(), pdCli) - } else { - manager = newTiDBChecksumExecutor(rc.tidbMgr.db) - } - - return manager, nil -} - func (t *TableRestore) restoreTable( ctx context.Context, rc *RestoreController,