diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index a300aa7cfc6..7e91d7c5be9 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -415,7 +415,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* // be max uint64. In this case, we need to advance newResolvedTs to global barrier // ts and advance newCheckpointTs to min table barrier ts. if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 { - if newCheckpointTs != math.MaxUint64 || newResolvedTs != math.MaxUint64 { + if newCheckpointTs != newResolvedTs { log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not", zap.Uint64("checkpointTs", preCheckpointTs), zap.Uint64("resolvedTs", c.state.Status.ResolvedTs), @@ -426,8 +426,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* newCheckpointTs = barrier.minDDLBarrierTs } - // If the table pipeline corresponding to minDDLBarrierTs is not ready, newCheckpointTs - // should wait for it. // Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter // whether redo is enabled. if newCheckpointTs > barrier.minDDLBarrierTs { @@ -459,6 +457,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } else { newResolvedTs = prevResolvedTs } + // If allPhysicalTables is empty, newCheckpointTs would advance to min table barrier ts, which may be larger + // than preResolvedTs. In this case, we need to set newCheckpointTs to preResolvedTs to guarantee that the + // checkpointTs will not cross the preResolvedTs. if newCheckpointTs > prevResolvedTs { newCheckpointTs = prevResolvedTs if newCheckpointTs < preCheckpointTs {