From baf2d913ea13a51e5b0247afc57e8d0ba1e96deb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 7 May 2024 18:25:08 +0800 Subject: [PATCH] ddl: check local file existence before resume checkpoint (#53072) close pingcap/tidb#53009 --- pkg/ddl/backfilling_operators.go | 2 +- pkg/ddl/backfilling_scheduler.go | 4 +- pkg/ddl/index_cop.go | 4 +- pkg/ddl/ingest/backend.go | 44 ++-- pkg/ddl/ingest/checkpoint.go | 301 +++++++++++++++------------- pkg/ddl/ingest/checkpoint_test.go | 96 +++++---- pkg/ddl/ingest/flush.go | 1 + pkg/ddl/ingest/mock.go | 11 +- pkg/ddl/internal/session/session.go | 2 + pkg/ddl/reorg.go | 9 +- pkg/ddl/util/BUILD.bazel | 6 +- pkg/ddl/util/util.go | 7 + pkg/ddl/util/util_test.go | 34 ++++ 13 files changed, 315 insertions(+), 206 deletions(-) create mode 100644 pkg/ddl/util/util_test.go diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index a95dc8b426eef..75f7b9610ed2a 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -829,7 +829,7 @@ func (s *indexWriteResultSink) flush() error { }) for _, index := range s.indexes { idxInfo := index.Meta() - _, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal) + _, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceFlushAndImport) if err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta()) diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 6e3a0ee76fbda..05b522ee6630e 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -397,7 +397,7 @@ func (b *ingestBackfillScheduler) close(force bool) { b.writerPool.ReleaseAndWait() } if b.checkpointMgr != nil { - b.checkpointMgr.Sync() + b.checkpointMgr.Flush() // Get the latest status after all workers are closed so that the result is more accurate. cnt, nextKey := b.checkpointMgr.Status() b.sendResult(&backfillResult{ @@ -585,7 +585,7 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool cnt, nextKey := w.checkpointMgr.Status() result.totalCount = cnt result.nextKey = nextKey - result.err = w.checkpointMgr.UpdateCurrent(rs.ID, count) + result.err = w.checkpointMgr.UpdateWrittenKeys(rs.ID, count) } else { result.addedCount = count result.scanCount = count diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 58d6980ad9788..557d72ed900f5 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -121,7 +121,7 @@ func (c *copReqSender) run() { if !ok { return } - if p.checkpointMgr != nil && p.checkpointMgr.IsComplete(task.endKey) { + if p.checkpointMgr != nil && p.checkpointMgr.IsKeyProcessed(task.endKey) { logutil.Logger(p.ctx).Info("checkpoint detected, skip a cop-request task", zap.Int("task ID", task.id), zap.String("task end key", hex.EncodeToString(task.endKey))) @@ -163,7 +163,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session) return err } if p.checkpointMgr != nil { - p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done) + p.checkpointMgr.UpdateTotalKeys(task.id, srcChk.NumRows(), done) } idxRs := IndexRecordChunk{ID: task.id, Chunk: srcChk, Done: done} rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds() diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 4a382af699e18..7747502493283 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -63,14 +63,14 @@ type BackendCtx interface { type FlushMode byte const ( - // FlushModeAuto means flush when the memory table size reaches the threshold. + // FlushModeAuto means caller does not enforce any flush, the implementation can + // decide it. FlushModeAuto FlushMode = iota - // FlushModeForceLocal means flush all data to local storage. - FlushModeForceLocal - // FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota. - FlushModeForceLocalAndCheckDiskQuota - // FlushModeForceGlobal means import all data in local storage to global storage. - FlushModeForceGlobal + // FlushModeForceFlushNoImport means flush all data to local storage, but don't + // import the data to TiKV. + FlushModeForceFlushNoImport + // FlushModeForceFlushAndImport means flush and import all data to TiKV. + FlushModeForceFlushAndImport ) // litBackendCtx store a backend info for add index reorg task. @@ -183,7 +183,7 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found") } - shouldFlush, shouldImport := bc.ShouldSync(mode) + shouldFlush, shouldImport := bc.checkFlush(mode) if !shouldFlush { return false, false, nil } @@ -268,28 +268,24 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { // ForceSyncFlagForTest is a flag to force sync only for test. var ForceSyncFlagForTest = false -func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImport bool) { - if mode == FlushModeForceGlobal || ForceSyncFlagForTest { +func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImport bool) { + if mode == FlushModeForceFlushAndImport || ForceSyncFlagForTest { return true, true } - if mode == FlushModeForceLocal { + if mode == FlushModeForceFlushNoImport { return true, false } bc.diskRoot.UpdateUsage() shouldImport = bc.diskRoot.ShouldImport() - if mode == FlushModeForceLocalAndCheckDiskQuota { - shouldFlush = true - } else { - interval := bc.updateInterval - // This failpoint will be manually set through HTTP status port. - failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) { - if v, ok := val.(int); ok { - interval = time.Duration(v) * time.Millisecond - } - }) - shouldFlush = shouldImport || - time.Since(bc.timeOfLastFlush.Load()) >= interval - } + interval := bc.updateInterval + // This failpoint will be manually set through HTTP status port. + failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) { + if v, ok := val.(int); ok { + interval = time.Duration(v) * time.Millisecond + } + }) + shouldFlush = shouldImport || + time.Since(bc.timeOfLastFlush.Load()) >= interval return shouldFlush, shouldImport } diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index cde9bcbbc20ea..7b7b73f5e3aaf 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -34,52 +34,57 @@ import ( "go.uber.org/zap" ) -// CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization. +// CheckpointManager is a checkpoint manager implementation that used by +// non-distributed reorganization. It manages the data as two-level checkpoints: +// "flush"ed to local storage and "import"ed to TiKV. The checkpoint is saved in +// a table in the TiDB cluster. type CheckpointManager struct { - ctx context.Context - flushCtrl FlushController - sessPool *sess.Pool - jobID int64 - indexIDs []int64 + ctx context.Context + cancel context.CancelFunc + flushCtrl FlushController + sessPool *sess.Pool + jobID int64 + indexIDs []int64 + localStoreDir string + logger *zap.Logger // Derived and unchanged after the initialization. instanceAddr string localDataIsValid bool // Live in memory. - checkpoints map[int]*TaskCheckpoint // task ID -> checkpoint - mu sync.Mutex - minTaskIDSynced int - dirty bool + mu sync.Mutex + checkpoints map[int]*taskCheckpoint // task ID -> checkpoint + // we require each task ID to be continuous and start from 1. + minTaskIDFinished int + dirty bool // Local meta. - pidLocal int64 - startLocal kv.Key - endLocal kv.Key + pidFlushed int64 + startKeyFlushed kv.Key + endKeyFlushed kv.Key // Persisted to the storage. - minKeySyncLocal kv.Key - minKeySyncGlobal kv.Key - localCnt int - globalCnt int + minFlushedKey kv.Key + minImportedKey kv.Key + flushedKeyCnt int + importedKeyCnt int // Global meta. - pidGlobal int64 - startGlobal kv.Key - endGlobal kv.Key + pidImported int64 + startKeyImported kv.Key + endKeyImported kv.Key // For persisting the checkpoint periodically. - updating bool - updaterWg sync.WaitGroup - updaterCh chan *sync.WaitGroup - updaterExitCh chan struct{} + updaterWg sync.WaitGroup + updaterCh chan chan struct{} } -// TaskCheckpoint is the checkpoint for a single task. -type TaskCheckpoint struct { +// taskCheckpoint is the checkpoint for a single task. +type taskCheckpoint struct { totalKeys int - currentKeys int + writtenKeys int checksum int64 endKey kv.Key - lastBatchSent bool + lastBatchRead bool } // FlushController is an interface to control the flush of the checkpoint. @@ -88,21 +93,33 @@ type FlushController interface { } // NewCheckpointManager creates a new checkpoint manager. -func NewCheckpointManager(ctx context.Context, flushCtrl FlushController, - sessPool *sess.Pool, jobID int64, indexIDs []int64) (*CheckpointManager, error) { - instanceAddr := InitInstanceAddr() +func NewCheckpointManager( + ctx context.Context, + flushCtrl FlushController, + sessPool *sess.Pool, + jobID int64, + indexIDs []int64, + localStoreDir string, +) (*CheckpointManager, error) { + instanceAddr := InstanceAddr() + ctx2, cancel := context.WithCancel(ctx) + logger := logutil.DDLIngestLogger().With( + zap.Int64("jobID", jobID), zap.Int64s("indexIDs", indexIDs)) + cm := &CheckpointManager{ - ctx: ctx, + ctx: ctx2, + cancel: cancel, flushCtrl: flushCtrl, sessPool: sessPool, jobID: jobID, indexIDs: indexIDs, - checkpoints: make(map[int]*TaskCheckpoint, 16), + localStoreDir: localStoreDir, + logger: logger, + checkpoints: make(map[int]*taskCheckpoint, 16), mu: sync.Mutex{}, instanceAddr: instanceAddr, updaterWg: sync.WaitGroup{}, - updaterExitCh: make(chan struct{}), - updaterCh: make(chan *sync.WaitGroup), + updaterCh: make(chan chan struct{}), } err := cm.resumeCheckpoint() if err != nil { @@ -113,65 +130,70 @@ func NewCheckpointManager(ctx context.Context, flushCtrl FlushController, cm.updateCheckpointLoop() cm.updaterWg.Done() }() - logutil.DDLIngestLogger().Info("create checkpoint manager", - zap.Int64("jobID", jobID), zap.Int64s("indexIDs", indexIDs)) + logger.Info("create checkpoint manager") return cm, nil } -// InitInstanceAddr returns the string concat with instance address and temp-dir. -func InitInstanceAddr() string { +// InstanceAddr returns the string concat with instance address and temp-dir. +func InstanceAddr() string { cfg := config.GetGlobalConfig() dsn := net.JoinHostPort(cfg.AdvertiseAddress, strconv.Itoa(int(cfg.Port))) return fmt.Sprintf("%s:%s", dsn, cfg.TempDir) } -// IsComplete checks if the task is complete. -// This is called before the reader reads the data and decides whether to skip the current task. -func (s *CheckpointManager) IsComplete(end kv.Key) bool { +// IsKeyProcessed checks if the key is processed. The key may not be imported. +// This is called before the reader reads the data and decides whether to skip +// the current task. +func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool { s.mu.Lock() defer s.mu.Unlock() - if len(s.minKeySyncGlobal) > 0 && end.Cmp(s.minKeySyncGlobal) <= 0 { + if len(s.minImportedKey) > 0 && end.Cmp(s.minImportedKey) <= 0 { return true } - return s.localDataIsValid && len(s.minKeySyncLocal) > 0 && end.Cmp(s.minKeySyncLocal) <= 0 + return s.localDataIsValid && len(s.minFlushedKey) > 0 && end.Cmp(s.minFlushedKey) <= 0 } // Status returns the status of the checkpoint. -func (s *CheckpointManager) Status() (int, kv.Key) { +func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) { s.mu.Lock() defer s.mu.Unlock() total := 0 for _, cp := range s.checkpoints { - total += cp.currentKeys + total += cp.writtenKeys } - return s.localCnt + total, s.minKeySyncGlobal + // TODO(lance6716): ??? + return s.flushedKeyCnt + total, s.minImportedKey } -// Register registers a new task. +// Register registers a new task. taskID MUST be continuous ascending and start +// from 1. +// +// TODO(lance6716): remove this constraint, use endKey as taskID and use +// ordered map type for checkpoints. func (s *CheckpointManager) Register(taskID int, end kv.Key) { s.mu.Lock() defer s.mu.Unlock() - s.checkpoints[taskID] = &TaskCheckpoint{ + s.checkpoints[taskID] = &taskCheckpoint{ endKey: end, } } -// UpdateTotal updates the total keys of the task. +// UpdateTotalKeys updates the total keys of the task. // This is called by the reader after reading the data to update the number of rows contained in the current chunk. -func (s *CheckpointManager) UpdateTotal(taskID int, added int, last bool) { +func (s *CheckpointManager) UpdateTotalKeys(taskID int, delta int, last bool) { s.mu.Lock() defer s.mu.Unlock() cp := s.checkpoints[taskID] - cp.totalKeys += added - cp.lastBatchSent = last + cp.totalKeys += delta + cp.lastBatchRead = last } -// UpdateCurrent updates the current keys of the task. +// UpdateWrittenKeys updates the written keys of the task. // This is called by the writer after writing the local engine to update the current number of rows written. -func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { +func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error { s.mu.Lock() cp := s.checkpoints[taskID] - cp.currentKeys += added + cp.writtenKeys += delta s.mu.Unlock() flushed, imported, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeAuto, s.indexIDs) @@ -181,70 +203,82 @@ func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error { s.mu.Lock() defer s.mu.Unlock() - s.progressLocalSyncMinKey() - if imported && s.minKeySyncGlobal.Cmp(s.minKeySyncLocal) != 0 { - s.minKeySyncGlobal = s.minKeySyncLocal - s.globalCnt = s.localCnt + s.afterFlush() + if imported && s.minImportedKey.Cmp(s.minFlushedKey) != 0 { + // TODO(lance6716): add warning log if cmp > 0 + s.minImportedKey = s.minFlushedKey + s.importedKeyCnt = s.flushedKeyCnt s.dirty = true - s.pidGlobal = s.pidLocal - s.startGlobal = s.startLocal - s.endGlobal = s.endLocal + s.pidImported = s.pidFlushed + s.startKeyImported = s.startKeyFlushed + s.endKeyImported = s.endKeyFlushed } return nil } -func (s *CheckpointManager) progressLocalSyncMinKey() { +// afterFlush should be called after all engine is flushed. +func (s *CheckpointManager) afterFlush() { for { - cp := s.checkpoints[s.minTaskIDSynced+1] - if cp == nil || !cp.lastBatchSent || cp.currentKeys < cp.totalKeys { + cp := s.checkpoints[s.minTaskIDFinished+1] + if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys { break } - s.minTaskIDSynced++ - s.minKeySyncLocal = cp.endKey - s.localCnt += cp.totalKeys - delete(s.checkpoints, s.minTaskIDSynced) + s.minTaskIDFinished++ + s.minFlushedKey = cp.endKey + s.flushedKeyCnt += cp.totalKeys + delete(s.checkpoints, s.minTaskIDFinished) s.dirty = true } } // Close closes the checkpoint manager. func (s *CheckpointManager) Close() { - s.updaterExitCh <- struct{}{} + s.cancel() s.updaterWg.Wait() - logutil.DDLIngestLogger().Info("close checkpoint manager", - zap.Int64("jobID", s.jobID), zap.Int64s("indexIDs", s.indexIDs)) + s.logger.Info("close checkpoint manager") } -// Sync syncs the checkpoint. -func (s *CheckpointManager) Sync() { - _, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceLocal, s.indexIDs) +// Flush flushed the data and updates checkpoint. +func (s *CheckpointManager) Flush() { + // use FlushModeForceFlushNoImport to finish the flush process timely. + _, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceFlushNoImport, s.indexIDs) if err != nil { - logutil.DDLIngestLogger().Warn("flush local engine failed", zap.Error(err)) + s.logger.Warn("flush local engine failed", zap.Error(err)) } s.mu.Lock() - s.progressLocalSyncMinKey() + s.afterFlush() s.mu.Unlock() - wg := sync.WaitGroup{} - wg.Add(1) - s.updaterCh <- &wg - wg.Wait() + + finishCh := make(chan struct{}) + select { + case s.updaterCh <- finishCh: + case <-s.ctx.Done(): + return + } + // wait updateCheckpointLoop to finish checkpoint update. + select { + case <-finishCh: + case <-s.ctx.Done(): + } } // Reset resets the checkpoint manager between two partitions. func (s *CheckpointManager) Reset(newPhysicalID int64, start, end kv.Key) { s.mu.Lock() defer s.mu.Unlock() - logutil.DDLIngestLogger().Info("reset checkpoint manager", - zap.Int64("newPhysicalID", newPhysicalID), zap.Int64("oldPhysicalID", s.pidLocal), - zap.Int64s("indexIDs", s.indexIDs), zap.Int64("jobID", s.jobID), zap.Int("localCnt", s.localCnt)) - if s.pidLocal != newPhysicalID { - s.minKeySyncLocal = nil - s.minKeySyncGlobal = nil - s.minTaskIDSynced = 0 - s.pidLocal = newPhysicalID - s.startLocal = start - s.endLocal = end + + s.logger.Info("reset checkpoint manager", + zap.Int64("newPhysicalID", newPhysicalID), + zap.Int64("oldPhysicalID", s.pidFlushed), + zap.Int("flushedKeyCnt", s.flushedKeyCnt)) + if s.pidFlushed != newPhysicalID { + s.minFlushedKey = nil + s.minImportedKey = nil + s.minTaskIDFinished = 0 + s.pidFlushed = newPhysicalID + s.startKeyFlushed = start + s.endKeyFlushed = end } } @@ -299,47 +333,41 @@ func (s *CheckpointManager) resumeCheckpoint() error { return errors.Trace(err) } if cp := reorgMeta.Checkpoint; cp != nil { - s.minKeySyncGlobal = cp.GlobalSyncKey - s.globalCnt = cp.GlobalKeyCount - s.pidGlobal = cp.PhysicalID - s.startGlobal = cp.StartKey - s.endGlobal = cp.EndKey - if s.instanceAddr == cp.InstanceAddr || cp.InstanceAddr == "" /* initial state */ { + s.minImportedKey = cp.GlobalSyncKey + s.importedKeyCnt = cp.GlobalKeyCount + s.pidImported = cp.PhysicalID + s.startKeyImported = cp.StartKey + s.endKeyImported = cp.EndKey + if util.FolderNotEmpty(s.localStoreDir) && + (s.instanceAddr == cp.InstanceAddr || cp.InstanceAddr == "" /* initial state */) { s.localDataIsValid = true - s.minKeySyncLocal = cp.LocalSyncKey - s.localCnt = cp.LocalKeyCount + s.minFlushedKey = cp.LocalSyncKey + s.flushedKeyCnt = cp.LocalKeyCount } - logutil.DDLIngestLogger().Info("resume checkpoint", - zap.Int64("job ID", s.jobID), zap.Int64s("index IDs", s.indexIDs), - zap.String("local checkpoint", hex.EncodeToString(s.minKeySyncLocal)), - zap.String("global checkpoint", hex.EncodeToString(s.minKeySyncGlobal)), + s.logger.Info("resume checkpoint", + zap.String("minimum flushed key", hex.EncodeToString(s.minFlushedKey)), + zap.String("minimum imported key", hex.EncodeToString(s.minImportedKey)), zap.Int64("physical table ID", cp.PhysicalID), zap.String("previous instance", cp.InstanceAddr), zap.String("current instance", s.instanceAddr)) return nil } - logutil.DDLIngestLogger().Info("checkpoint is empty", - zap.Int64("job ID", s.jobID), zap.Int64s("index IDs", s.indexIDs)) + s.logger.Info("checkpoint is empty") return nil }) } +// updateCheckpoint is only used by updateCheckpointLoop goroutine. func (s *CheckpointManager) updateCheckpoint() error { s.mu.Lock() - currentLocalKey := s.minKeySyncLocal - currentGlobalKey := s.minKeySyncGlobal - currentLocalCnt := s.localCnt - currentGlobalCnt := s.globalCnt - currentGlobalPID := s.pidGlobal - currentGlobalStart := s.startGlobal - currentGlobalEnd := s.endGlobal - s.updating = true + minKeyFlushed := s.minFlushedKey + minKeyImported := s.minImportedKey + flushedKeyCnt := s.flushedKeyCnt + importedKeyCnt := s.importedKeyCnt + pidImported := s.pidImported + startKeyImported := s.startKeyImported + endKeyImported := s.endKeyImported s.mu.Unlock() - defer func() { - s.mu.Lock() - s.updating = false - s.mu.Unlock() - }() sessCtx, err := s.sessPool.Get() if err != nil { @@ -350,14 +378,14 @@ func (s *CheckpointManager) updateCheckpoint() error { err = ddlSess.RunInTxn(func(se *sess.Session) error { template := "update mysql.tidb_ddl_reorg set reorg_meta = %s where job_id = %d and ele_type = %s;" cp := &ReorgCheckpoint{ - LocalSyncKey: currentLocalKey, - GlobalSyncKey: currentGlobalKey, - LocalKeyCount: currentLocalCnt, - GlobalKeyCount: currentGlobalCnt, + LocalSyncKey: minKeyFlushed, + GlobalSyncKey: minKeyImported, + LocalKeyCount: flushedKeyCnt, + GlobalKeyCount: importedKeyCnt, InstanceAddr: s.instanceAddr, - PhysicalID: currentGlobalPID, - StartKey: currentGlobalStart, - EndKey: currentGlobalEnd, + PhysicalID: pidImported, + StartKey: startKeyImported, + EndKey: endKeyImported, Version: JobCheckpointVersionCurrent, } rawReorgMeta, err := json.Marshal(JobReorgMeta{Checkpoint: cp}) @@ -375,11 +403,10 @@ func (s *CheckpointManager) updateCheckpoint() error { s.mu.Unlock() return nil }) - logutil.DDLIngestLogger().Info("update checkpoint", - zap.Int64("job ID", s.jobID), zap.Int64s("index IDs", s.indexIDs), - zap.String("local checkpoint", hex.EncodeToString(currentLocalKey)), - zap.String("global checkpoint", hex.EncodeToString(currentGlobalKey)), - zap.Int64("global physical ID", currentGlobalPID), + s.logger.Info("update checkpoint", + zap.String("local checkpoint", hex.EncodeToString(minKeyFlushed)), + zap.String("global checkpoint", hex.EncodeToString(minKeyImported)), + zap.Int64("global physical ID", pidImported), zap.Error(err)) return err } @@ -389,24 +416,24 @@ func (s *CheckpointManager) updateCheckpointLoop() { defer ticker.Stop() for { select { - case wg := <-s.updaterCh: + case finishCh := <-s.updaterCh: err := s.updateCheckpoint() if err != nil { - logutil.DDLIngestLogger().Error("update checkpoint failed", zap.Error(err)) + s.logger.Error("update checkpoint failed", zap.Error(err)) } - wg.Done() + close(finishCh) case <-ticker.C: s.mu.Lock() - if !s.dirty || s.updating { + if !s.dirty { s.mu.Unlock() continue } s.mu.Unlock() err := s.updateCheckpoint() if err != nil { - logutil.DDLIngestLogger().Error("update checkpoint failed", zap.Error(err)) + s.logger.Error("periodically update checkpoint failed", zap.Error(err)) } - case <-s.updaterExitCh: + case <-s.ctx.Done(): return } } diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index 55c701b229ae2..08a907255bcde 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -17,6 +17,8 @@ package ingest_test import ( "context" "encoding/json" + "os" + "path/filepath" "testing" "github.com/ngaut/pools" @@ -26,6 +28,12 @@ import ( "github.com/stretchr/testify/require" ) +func createDummyFile(t *testing.T, folder string) { + f, err := os.Create(filepath.Join(folder, "test-file")) + require.NoError(t, err) + require.NoError(t, f.Close()) +} + func TestCheckpointManager(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -38,45 +46,47 @@ func TestCheckpointManager(t *testing.T) { ctx := context.Background() sessPool := session.NewSessionPool(rs, store) flushCtrl := &dummyFlushCtrl{imported: false} - mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}) + tmpFolder := t.TempDir() + createDummyFile(t, tmpFolder) + mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder) require.NoError(t, err) defer mgr.Close() mgr.Register(1, []byte{'1', '9'}) mgr.Register(2, []byte{'2', '9'}) - mgr.UpdateTotal(1, 100, false) - require.False(t, mgr.IsComplete([]byte{'1', '9'})) - require.NoError(t, mgr.UpdateCurrent(1, 100)) - require.False(t, mgr.IsComplete([]byte{'1', '9'})) - mgr.UpdateTotal(1, 100, true) - require.NoError(t, mgr.UpdateCurrent(1, 100)) + mgr.UpdateTotalKeys(1, 100, false) + require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + mgr.UpdateTotalKeys(1, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) // The data is not imported to the storage yet. - require.False(t, mgr.IsComplete([]byte{'1', '9'})) + require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) flushCtrl.imported = true // Mock the data is imported to the storage. - require.NoError(t, mgr.UpdateCurrent(2, 0)) - require.True(t, mgr.IsComplete([]byte{'1', '9'})) + require.NoError(t, mgr.UpdateWrittenKeys(2, 0)) + require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) // Only when the last batch is completed, the job can be completed. - mgr.UpdateTotal(2, 50, false) - mgr.UpdateTotal(2, 50, true) - require.NoError(t, mgr.UpdateCurrent(2, 50)) - require.True(t, mgr.IsComplete([]byte{'1', '9'})) - require.False(t, mgr.IsComplete([]byte{'2', '9'})) - require.NoError(t, mgr.UpdateCurrent(2, 50)) - require.True(t, mgr.IsComplete([]byte{'1', '9'})) - require.True(t, mgr.IsComplete([]byte{'2', '9'})) + mgr.UpdateTotalKeys(2, 50, false) + mgr.UpdateTotalKeys(2, 50, true) + require.NoError(t, mgr.UpdateWrittenKeys(2, 50)) + require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) + require.NoError(t, mgr.UpdateWrittenKeys(2, 50)) + require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'})) // Only when the subsequent job is completed, the previous job can be completed. mgr.Register(3, []byte{'3', '9'}) mgr.Register(4, []byte{'4', '9'}) mgr.Register(5, []byte{'5', '9'}) - mgr.UpdateTotal(3, 100, true) - mgr.UpdateTotal(4, 100, true) - mgr.UpdateTotal(5, 100, true) - require.NoError(t, mgr.UpdateCurrent(5, 100)) - require.NoError(t, mgr.UpdateCurrent(4, 100)) - require.False(t, mgr.IsComplete([]byte{'3', '9'})) - require.False(t, mgr.IsComplete([]byte{'4', '9'})) + mgr.UpdateTotalKeys(3, 100, true) + mgr.UpdateTotalKeys(4, 100, true) + mgr.UpdateTotalKeys(5, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(5, 100)) + require.NoError(t, mgr.UpdateWrittenKeys(4, 100)) + require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'})) + require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'})) } func TestCheckpointManagerUpdateReorg(t *testing.T) { @@ -91,14 +101,16 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) { ctx := context.Background() sessPool := session.NewSessionPool(rs, store) flushCtrl := &dummyFlushCtrl{imported: true} - mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}) + tmpFolder := t.TempDir() + createDummyFile(t, tmpFolder) + mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder) require.NoError(t, err) defer mgr.Close() mgr.Register(1, []byte{'1', '9'}) - mgr.UpdateTotal(1, 100, true) - require.NoError(t, mgr.UpdateCurrent(1, 100)) - mgr.Sync() // Wait the global checkpoint to be updated to the reorg table. + mgr.UpdateTotalKeys(1, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + mgr.Flush() // Wait the global checkpoint to be updated to the reorg table. r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;") require.NoError(t, err) req := r.NewChunk(nil) @@ -122,11 +134,11 @@ func TestCheckpointManagerResumeReorg(t *testing.T) { tk.MustExec("use test") reorgMeta := &ingest.JobReorgMeta{ Checkpoint: &ingest.ReorgCheckpoint{ - LocalSyncKey: []byte{'1', '9'}, + LocalSyncKey: []byte{'2', '9'}, LocalKeyCount: 100, - GlobalSyncKey: []byte{'2', '9'}, + GlobalSyncKey: []byte{'1', '9'}, GlobalKeyCount: 200, - InstanceAddr: ingest.InitInstanceAddr(), + InstanceAddr: ingest.InstanceAddr(), Version: 1, }, } @@ -140,14 +152,26 @@ func TestCheckpointManagerResumeReorg(t *testing.T) { ctx := context.Background() sessPool := session.NewSessionPool(rs, store) flushCtrl := &dummyFlushCtrl{imported: false} - mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}) + tmpFolder := t.TempDir() + // checkpoint manager should not use local checkpoint if the folder is empty + mgr, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder) require.NoError(t, err) defer mgr.Close() - require.True(t, mgr.IsComplete([]byte{'1', '9'})) - require.True(t, mgr.IsComplete([]byte{'2', '9'})) + require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) localCnt, globalNextKey := mgr.Status() + require.Equal(t, 0, localCnt) + require.EqualValues(t, []byte{'1', '9'}, globalNextKey) + + createDummyFile(t, tmpFolder) + mgr2, err := ingest.NewCheckpointManager(ctx, flushCtrl, sessPool, 1, []int64{1}, tmpFolder) + require.NoError(t, err) + defer mgr2.Close() + require.True(t, mgr2.IsKeyProcessed([]byte{'1', '9'})) + require.True(t, mgr2.IsKeyProcessed([]byte{'2', '9'})) + localCnt, globalNextKey = mgr2.Status() require.Equal(t, 100, localCnt) - require.EqualValues(t, []byte{'2', '9'}, globalNextKey) + require.EqualValues(t, []byte{'1', '9'}, globalNextKey) } type dummyFlushCtrl struct { diff --git a/pkg/ddl/ingest/flush.go b/pkg/ddl/ingest/flush.go index 4d407f8733eb1..a3cb89a3886f1 100644 --- a/pkg/ddl/ingest/flush.go +++ b/pkg/ddl/ingest/flush.go @@ -19,6 +19,7 @@ func TryFlushAllIndexes(flushCtrl FlushController, mode FlushMode, indexIDs []in allFlushed := true allImported := true for _, idxID := range indexIDs { + // TODO(lance6716): use flushCtrl.Flush(indexIDs, mode) flushed, imported, err := flushCtrl.Flush(idxID, mode) if err != nil { return false, false, idxID, err diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 8829a6ca7c2fa..4393861003759 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -17,6 +17,9 @@ package ingest import ( "context" "encoding/hex" + "os" + "path/filepath" + "strconv" "sync" "github.com/pingcap/tidb/pkg/ddl/logutil" @@ -58,6 +61,7 @@ func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bo mockCtx := &MockBackendCtx{ mu: sync.Mutex{}, sessCtx: sessCtx, + jobID: jobID, } m.runningJobs[jobID] = mockCtx return mockCtx, nil @@ -96,6 +100,7 @@ func (m *MockBackendCtxMgr) ResetSessCtx() { type MockBackendCtx struct { sessCtx sessionctx.Context mu sync.Mutex + jobID int64 checkpointMgr *CheckpointManager } @@ -151,8 +156,10 @@ func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager { } // GetLocalBackend returns the local backend. -func (*MockBackendCtx) GetLocalBackend() *local.Backend { - return nil +func (m *MockBackendCtx) GetLocalBackend() *local.Backend { + b := &local.Backend{} + b.LocalStoreDir = filepath.Join(os.TempDir(), "mock_backend", strconv.FormatInt(m.jobID, 10)) + return b } // MockWriteHook the hook for write in mock engine. diff --git a/pkg/ddl/internal/session/session.go b/pkg/ddl/internal/session/session.go index 01b8d83afbc11..0242ca3f52c74 100644 --- a/pkg/ddl/internal/session/session.go +++ b/pkg/ddl/internal/session/session.go @@ -39,6 +39,8 @@ func NewSession(s sessionctx.Context) *Session { return &Session{s} } +// TODO(lance6716): provide a NewSessionWithCtx + // Begin starts a transaction. func (s *Session) Begin() error { err := sessiontxn.NewTxn(context.Background(), s.Context) diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 51e7b3893e3b0..ac788ed3d8019 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -289,7 +289,14 @@ func overwriteReorgInfoFromGlobalCheckpoint(w *worker, sess *sess.Session, job * if ok { // We create the checkpoint manager here because we need to wait for the reorg meta to be initialized. if bc.GetCheckpointManager() == nil { - mgr, err := ingest.NewCheckpointManager(w.ctx, bc, w.sessPool, job.ID, extractElemIDs(reorgInfo)) + mgr, err := ingest.NewCheckpointManager( + w.ctx, + bc, + w.sessPool, + job.ID, + extractElemIDs(reorgInfo), + bc.GetLocalBackend().LocalStoreDir, + ) if err != nil { logutil.DDLIngestLogger().Warn("create checkpoint manager failed", zap.Error(err)) } diff --git a/pkg/ddl/util/BUILD.bazel b/pkg/ddl/util/BUILD.bazel index 71bef12a86dcd..abff61676b963 100644 --- a/pkg/ddl/util/BUILD.bazel +++ b/pkg/ddl/util/BUILD.bazel @@ -31,11 +31,15 @@ go_library( go_test( name = "util_test", timeout = "short", - srcs = ["main_test.go"], + srcs = [ + "main_test.go", + "util_test.go", + ], embed = [":util"], flaky = True, deps = [ "//pkg/testkit/testsetup", + "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", ], ) diff --git a/pkg/ddl/util/util.go b/pkg/ddl/util/util.go index a8c6bef2c86fc..f2b947e6a636f 100644 --- a/pkg/ddl/util/util.go +++ b/pkg/ddl/util/util.go @@ -19,6 +19,7 @@ import ( "context" "encoding/hex" "fmt" + "os" "strings" "time" @@ -403,3 +404,9 @@ func IsRaftKv2(ctx context.Context, sctx sessionctx.Context) (bool, error) { raftVersion := rows[0].GetString(0) return raftVersion == raftKv2, nil } + +// FolderNotEmpty returns true only when the folder is not empty. +func FolderNotEmpty(path string) bool { + entries, _ := os.ReadDir(path) + return len(entries) > 0 +} diff --git a/pkg/ddl/util/util_test.go b/pkg/ddl/util/util_test.go new file mode 100644 index 0000000000000..679bed1852379 --- /dev/null +++ b/pkg/ddl/util/util_test.go @@ -0,0 +1,34 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFolderNotEmpty(t *testing.T) { + tmp := t.TempDir() + require.False(t, FolderNotEmpty(tmp)) + require.False(t, FolderNotEmpty(filepath.Join(tmp, "not-exist"))) + + f, err := os.Create(filepath.Join(tmp, "test-file")) + require.NoError(t, err) + require.NoError(t, f.Close()) + require.True(t, FolderNotEmpty(tmp)) +}