Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: check local file existence before resume checkpoint #53072

Merged
merged 4 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ func (s *indexWriteResultSink) flush() error {
})
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal)
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (b *ingestBackfillScheduler) close(force bool) {
b.writerPool.ReleaseAndWait()
}
if b.checkpointMgr != nil {
b.checkpointMgr.Sync()
b.checkpointMgr.Flush()
// Get the latest status after all workers are closed so that the result is more accurate.
cnt, nextKey := b.checkpointMgr.Status()
b.sendResult(&backfillResult{
Expand Down Expand Up @@ -585,7 +585,7 @@ func (w *addIndexIngestWorker) HandleTask(rs IndexRecordChunk, _ func(workerpool
cnt, nextKey := w.checkpointMgr.Status()
result.totalCount = cnt
result.nextKey = nextKey
result.err = w.checkpointMgr.UpdateCurrent(rs.ID, count)
result.err = w.checkpointMgr.UpdateWrittenKeys(rs.ID, count)
} else {
result.addedCount = count
result.scanCount = count
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (c *copReqSender) run() {
if !ok {
return
}
if p.checkpointMgr != nil && p.checkpointMgr.IsComplete(task.endKey) {
if p.checkpointMgr != nil && p.checkpointMgr.IsKeyProcessed(task.endKey) {
logutil.Logger(p.ctx).Info("checkpoint detected, skip a cop-request task",
zap.Int("task ID", task.id),
zap.String("task end key", hex.EncodeToString(task.endKey)))
Expand Down Expand Up @@ -163,7 +163,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
return err
}
if p.checkpointMgr != nil {
p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done)
p.checkpointMgr.UpdateTotalKeys(task.id, srcChk.NumRows(), done)
}
idxRs := IndexRecordChunk{ID: task.id, Chunk: srcChk, Done: done}
rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds()
Expand Down
44 changes: 20 additions & 24 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ type BackendCtx interface {
type FlushMode byte

const (
// FlushModeAuto means flush when the memory table size reaches the threshold.
// FlushModeAuto means caller does not enforce any flush, the implementation can
// decide it.
FlushModeAuto FlushMode = iota
// FlushModeForceLocal means flush all data to local storage.
FlushModeForceLocal
// FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota.
FlushModeForceLocalAndCheckDiskQuota
// FlushModeForceGlobal means import all data in local storage to global storage.
FlushModeForceGlobal
// FlushModeForceFlushNoImport means flush all data to local storage, but don't
// import the data to TiKV.
FlushModeForceFlushNoImport
// FlushModeForceFlushAndImport means flush and import all data to TiKV.
FlushModeForceFlushAndImport
)

// litBackendCtx store a backend info for add index reorg task.
Expand Down Expand Up @@ -183,7 +183,7 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found")
}

shouldFlush, shouldImport := bc.ShouldSync(mode)
shouldFlush, shouldImport := bc.checkFlush(mode)
if !shouldFlush {
return false, false, nil
}
Expand Down Expand Up @@ -268,28 +268,24 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
// ForceSyncFlagForTest is a flag to force sync only for test.
var ForceSyncFlagForTest = false

func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImport bool) {
if mode == FlushModeForceGlobal || ForceSyncFlagForTest {
func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImport bool) {
if mode == FlushModeForceFlushAndImport || ForceSyncFlagForTest {
return true, true
}
if mode == FlushModeForceLocal {
if mode == FlushModeForceFlushNoImport {
return true, false
}
bc.diskRoot.UpdateUsage()
shouldImport = bc.diskRoot.ShouldImport()
if mode == FlushModeForceLocalAndCheckDiskQuota {
shouldFlush = true
} else {
interval := bc.updateInterval
// This failpoint will be manually set through HTTP status port.
failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) {
if v, ok := val.(int); ok {
interval = time.Duration(v) * time.Millisecond
}
})
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= interval
}
interval := bc.updateInterval
// This failpoint will be manually set through HTTP status port.
failpoint.Inject("mockSyncIntervalMs", func(val failpoint.Value) {
if v, ok := val.(int); ok {
interval = time.Duration(v) * time.Millisecond
}
})
shouldFlush = shouldImport ||
time.Since(bc.timeOfLastFlush.Load()) >= interval
return shouldFlush, shouldImport
}

Expand Down
Loading