diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 0241789e65103..927f3937963a0 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -312,13 +312,13 @@ func (bc *Client) StartCheckpointRunner( } } - bc.checkpointRunner = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher) - return nil + bc.checkpointRunner, err = checkpoint.StartCheckpointRunner(ctx, bc.storage, bc.cipher, bc.mgr.GetPDClient()) + return errors.Trace(err) } -func (bc *Client) WaitForFinishCheckpoint() { +func (bc *Client) WaitForFinishCheckpoint(ctx context.Context) { if bc.checkpointRunner != nil { - bc.checkpointRunner.WaitForFinish() + bc.checkpointRunner.WaitForFinish(ctx) } } diff --git a/br/pkg/checkpoint/BUILD.bazel b/br/pkg/checkpoint/BUILD.bazel index 76a30d72885be..20e39dc39025b 100644 --- a/br/pkg/checkpoint/BUILD.bazel +++ b/br/pkg/checkpoint/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//oracle", "@org_uber_go_zap//:zap", ], ) @@ -29,5 +30,6 @@ go_test( "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", ], ) diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index c462e78e949ad..0af1cefdf2594 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -44,11 +45,16 @@ const ( CheckpointDataDir = CheckpointDir + "/data" CheckpointChecksumDir = CheckpointDir + "/checksum" + CheckpointLockPath = CheckpointDir + "/checkpoint.lock" ) const MaxChecksumTotalCost float64 = 60.0 -const tickDuration = 30 * time.Second +const tickDurationForFlush = 30 * time.Second + +const tickDurationForLock = 4 * time.Minute + +const lockTimeToLive = 5 * time.Minute type CheckpointMessage struct { // start-key of the origin range @@ -132,6 +138,12 @@ func NewChecksumRunner() *ChecksumRunner { } } +func (cr *ChecksumRunner) RecordError(err error) { + cr.Lock() + cr.err = err + cr.Unlock() +} + // FlushChecksum save the checksum in the memory temporarily // and flush to the external storage if checksum take much time func (cr *ChecksumRunner) FlushChecksum( @@ -180,15 +192,10 @@ func (cr *ChecksumRunner) FlushChecksum( cr.wg.Add(1) cr.workerPool.Apply(func() { defer cr.wg.Done() - recordErr := func(err error) { - cr.Lock() - cr.err = err - cr.Unlock() - } content, err := json.Marshal(toBeFlushedChecksumItems) if err != nil { - recordErr(err) + cr.RecordError(err) return } @@ -200,37 +207,45 @@ func (cr *ChecksumRunner) FlushChecksum( data, err := json.Marshal(checksumInfo) if err != nil { - recordErr(err) + cr.RecordError(err) return } fname := fmt.Sprintf("%s/t%d_and__", CheckpointChecksumDir, tableID) err = s.WriteFile(ctx, fname, data) if err != nil { - recordErr(err) + cr.RecordError(err) return } }) return nil } +type GlobalTimer interface { + GetTS(context.Context) (int64, int64, error) +} + type CheckpointRunner struct { + lockId uint64 + meta map[string]*RangeGroups checksumRunner *ChecksumRunner storage storage.ExternalStorage cipher *backuppb.CipherInfo + timer GlobalTimer appendCh chan *CheckpointMessage metaCh chan map[string]*RangeGroups + lockCh chan struct{} errCh chan error wg sync.WaitGroup } // only for test -func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration) *CheckpointRunner { +func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) { runner := &CheckpointRunner{ meta: make(map[string]*RangeGroups), @@ -238,17 +253,23 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS storage: storage, cipher: cipher, + timer: timer, appendCh: make(chan *CheckpointMessage), metaCh: make(chan map[string]*RangeGroups), + lockCh: make(chan struct{}), errCh: make(chan error, 1), } - runner.startCheckpointLoop(ctx, tick) - return runner + err := runner.initialLock(ctx) + if err != nil { + return nil, errors.Annotate(err, "Failed to initialize checkpoint lock.") + } + runner.startCheckpointLoop(ctx, tick, tick) + return runner, nil } -func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo) *CheckpointRunner { +func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) { runner := &CheckpointRunner{ meta: make(map[string]*RangeGroups), @@ -256,14 +277,20 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, storage: storage, cipher: cipher, + timer: timer, appendCh: make(chan *CheckpointMessage), metaCh: make(chan map[string]*RangeGroups), + lockCh: make(chan struct{}), errCh: make(chan error, 1), } - runner.startCheckpointLoop(ctx, tickDuration) - return runner + err := runner.initialLock(ctx) + if err != nil { + return nil, errors.Trace(err) + } + runner.startCheckpointLoop(ctx, tickDurationForFlush, tickDurationForLock) + return runner, nil } func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error { @@ -295,13 +322,18 @@ func (r *CheckpointRunner) Append( } // Note: Cannot be parallel with `Append` function -func (r *CheckpointRunner) WaitForFinish() { +func (r *CheckpointRunner) WaitForFinish(ctx context.Context) { // can not append anymore close(r.appendCh) // wait the range flusher exit r.wg.Wait() // wait the checksum flusher exit r.checksumRunner.wg.Wait() + // remove the checkpoint lock + err := r.storage.DeleteFile(ctx, CheckpointLockPath) + if err != nil { + log.Warn("failed to remove the checkpoint lock", zap.Error(err)) + } } // Send the meta to the flush goroutine, and reset the CheckpointRunner's meta @@ -318,6 +350,16 @@ func (r *CheckpointRunner) flushMeta(ctx context.Context, errCh chan error) erro return nil } +func (r *CheckpointRunner) setLock(ctx context.Context, errCh chan error) error { + select { + case <-ctx.Done(): + case err := <-errCh: + return err + case r.lockCh <- struct{}{}: + } + return nil +} + // start a goroutine to flush the meta, which is sent from `checkpoint looper`, to the external storage func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.WaitGroup) chan error { errCh := make(chan error, 1) @@ -337,6 +379,15 @@ func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.W errCh <- err return } + case _, ok := <-r.lockCh: + if !ok { + log.Info("stop checkpoint flush worker") + return + } + if err := r.updateLock(ctx); err != nil { + errCh <- errors.Annotate(err, "Failed to update checkpoint lock.") + return + } } } } @@ -351,9 +402,10 @@ func (r *CheckpointRunner) sendError(err error) { default: log.Error("errCh is blocked", logutil.ShortError(err)) } + r.checksumRunner.RecordError(err) } -func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) { +func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush, tickDurationForLock time.Duration) { r.wg.Add(1) checkpointLoop := func(ctx context.Context) { defer r.wg.Done() @@ -361,13 +413,20 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration defer cancel() var wg sync.WaitGroup errCh := r.startCheckpointRunner(cctx, &wg) - ticker := time.NewTicker(tickDuration) - defer ticker.Stop() + flushTicker := time.NewTicker(tickDurationForFlush) + defer flushTicker.Stop() + lockTicker := time.NewTicker(tickDurationForLock) + defer lockTicker.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-lockTicker.C: + if err := r.setLock(ctx, errCh); err != nil { + r.sendError(err) + return + } + case <-flushTicker.C: if err := r.flushMeta(ctx, errCh); err != nil { r.sendError(err) return @@ -381,6 +440,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration // close the channel to flush worker // and wait it to consumes all the metas close(r.metaCh) + close(r.lockCh) wg.Wait() return } @@ -464,6 +524,115 @@ func (r *CheckpointRunner) doFlush(ctx context.Context, meta map[string]*RangeGr return nil } +type CheckpointLock struct { + LockId uint64 `json:"lock-id"` + ExpireAt int64 `json:"expire-at"` +} + +// get ts with retry +func (r *CheckpointRunner) getTS(ctx context.Context) (int64, int64, error) { + var ( + p int64 = 0 + l int64 = 0 + retry int = 0 + ) + errRetry := utils.WithRetry(ctx, func() error { + var err error + p, l, err = r.timer.GetTS(ctx) + if err != nil { + retry++ + log.Info("failed to get ts", zap.Int("retry", retry), zap.Error(err)) + return err + } + + return nil + }, utils.NewPDReqBackoffer()) + + return p, l, errors.Trace(errRetry) +} + +// flush the lock to the external storage +func (r *CheckpointRunner) flushLock(ctx context.Context, p int64) error { + lock := &CheckpointLock{ + LockId: r.lockId, + ExpireAt: p + lockTimeToLive.Milliseconds(), + } + log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p), zap.Int64("expire-at", lock.ExpireAt)) + data, err := json.Marshal(lock) + if err != nil { + return errors.Trace(err) + } + + err = r.storage.WriteFile(ctx, CheckpointLockPath, data) + return errors.Trace(err) +} + +// check whether this lock belongs to this BR +func (r *CheckpointRunner) checkLockFile(ctx context.Context, now int64) error { + data, err := r.storage.ReadFile(ctx, CheckpointLockPath) + if err != nil { + return errors.Trace(err) + } + lock := &CheckpointLock{} + err = json.Unmarshal(data, lock) + if err != nil { + return errors.Trace(err) + } + if lock.ExpireAt <= now { + if lock.LockId > r.lockId { + return errors.Errorf("There are another BR(%d) running after but setting lock before this one(%d). "+ + "Please check whether the BR is running. If not, you can retry.", lock.LockId, r.lockId) + } + if lock.LockId == r.lockId { + log.Warn("The lock has expired.", zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now)) + } + } else if lock.LockId != r.lockId { + return errors.Errorf("The existing lock will expire in %d seconds. "+ + "There may be another BR(%d) running. If not, you can wait for the lock to expire, or delete the file `%s%s` manually.", + (lock.ExpireAt-now)/1000, lock.LockId, strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath) + } + + return nil +} + +// generate a new lock and flush the lock to the external storage +func (r *CheckpointRunner) updateLock(ctx context.Context) error { + p, _, err := r.getTS(ctx) + if err != nil { + return errors.Trace(err) + } + if err = r.checkLockFile(ctx, p); err != nil { + return errors.Trace(err) + } + return errors.Trace(r.flushLock(ctx, p)) +} + +// Attempt to initialize the lock. Need to stop the backup when there is an unexpired locks. +func (r *CheckpointRunner) initialLock(ctx context.Context) error { + p, l, err := r.getTS(ctx) + if err != nil { + return errors.Trace(err) + } + r.lockId = oracle.ComposeTS(p, l) + exist, err := r.storage.FileExists(ctx, CheckpointLockPath) + if err != nil { + return errors.Trace(err) + } + if exist { + if err := r.checkLockFile(ctx, p); err != nil { + return errors.Trace(err) + } + } + if err = r.flushLock(ctx, p); err != nil { + return errors.Trace(err) + } + + // wait for 3 seconds to check whether the lock file is overwritten by another BR + time.Sleep(3 * time.Second) + err = r.checkLockFile(ctx, p) + return errors.Trace(err) +} + // walk the whole checkpoint range files and retrieve the metadatat of backed up ranges // and return the total time cost in the past executions func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) { diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index f52b1c6ad2b02..29d8b5aa993ac 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -16,6 +16,7 @@ package checkpoint_test import ( "context" + "encoding/json" "os" "strings" "testing" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func TestCheckpointMeta(t *testing.T) { @@ -49,6 +51,19 @@ func TestCheckpointMeta(t *testing.T) { require.Equal(t, checkpointMeta.BackupTS, checkpointMeta2.BackupTS) } +type mockTimer struct { + p int64 + l int64 +} + +func NewMockTimer(p, l int64) *mockTimer { + return &mockTimer{p: p, l: l} +} + +func (t *mockTimer) GetTS(ctx context.Context) (int64, int64, error) { + return t.p, t.l, nil +} + func TestCheckpointRunner(t *testing.T) { ctx := context.Background() base := t.TempDir() @@ -61,7 +76,8 @@ func TestCheckpointRunner(t *testing.T) { CipherType: encryptionpb.EncryptionMethod_AES256_CTR, CipherKey: []byte("01234567890123456789012345678901"), } - checkpointRunner := checkpoint.StartCheckpointRunnerForTest(ctx, s, cipher, 5*time.Second) + checkpointRunner, err := checkpoint.StartCheckpointRunnerForTest(ctx, s, cipher, 5*time.Second, NewMockTimer(10, 10)) + require.NoError(t, err) data := map[string]struct { StartKey string @@ -128,7 +144,7 @@ func TestCheckpointRunner(t *testing.T) { require.NoError(t, err) } - checkpointRunner.WaitForFinish() + checkpointRunner.WaitForFinish(ctx) checker := func(groupKey string, resp *rtree.Range) { require.NotNil(t, resp) @@ -173,3 +189,41 @@ func TestCheckpointRunner(t *testing.T) { require.NoError(t, err) require.Equal(t, count, 2) } + +func getLockData(p, l int64) ([]byte, error) { + lock := checkpoint.CheckpointLock{ + LockId: oracle.ComposeTS(p, l), + ExpireAt: p + 10, + } + return json.Marshal(lock) +} + +func TestCheckpointRunnerLock(t *testing.T) { + ctx := context.Background() + base := t.TempDir() + s, err := storage.NewLocalStorage(base) + require.NoError(t, err) + os.MkdirAll(base+checkpoint.CheckpointDataDir, 0o755) + os.MkdirAll(base+checkpoint.CheckpointChecksumDir, 0o755) + + cipher := &backuppb.CipherInfo{ + CipherType: encryptionpb.EncryptionMethod_AES256_CTR, + CipherKey: []byte("01234567890123456789012345678901"), + } + + data, err := getLockData(10, 20) + require.NoError(t, err) + err = s.WriteFile(ctx, checkpoint.CheckpointLockPath, data) + require.NoError(t, err) + + _, err = checkpoint.StartCheckpointRunnerForTest(ctx, s, cipher, 5*time.Second, NewMockTimer(10, 10)) + require.Error(t, err) + + runner, err := checkpoint.StartCheckpointRunnerForTest(ctx, s, cipher, 5*time.Second, NewMockTimer(30, 10)) + require.NoError(t, err) + + _, err = checkpoint.StartCheckpointRunnerForTest(ctx, s, cipher, 5*time.Second, NewMockTimer(40, 10)) + require.Error(t, err) + + runner.WaitForFinish(ctx) +} diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index 2d6e0571faa93..0259e715c7968 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" @@ -38,7 +39,7 @@ func (l *LocalStorage) DeleteFile(_ context.Context, name string) error { func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) error { // because `os.WriteFile` is not atomic, directly write into it may reset the file // to an empty file if write is not finished. - tmpPath := filepath.Join(l.base, name) + ".tmp" + tmpPath := filepath.Join(l.base, name) + ".tmp." + uuid.NewString() if err := os.WriteFile(tmpPath, data, localFilePerm); err != nil { path := filepath.Dir(tmpPath) log.Info("failed to write file, try to mkdir the path", zap.String("path", path)) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 63e3f43ae5d30..0033324037e90 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -597,7 +597,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig defer func() { if !gcSafePointKeeperRemovable { log.Info("wait for flush checkpoint...") - client.WaitForFinishCheckpoint() + client.WaitForFinishCheckpoint(ctx) } }() }