Skip to content

Commit

Permalink
fix resolvedTs
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 23, 2023
1 parent 9cde63c commit c5f11a5
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -371,13 +371,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
log.Debug("owner handles barrier",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
if barrier.GlobalBarrierTs < preCheckpointTs {
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
Expand All @@ -386,7 +386,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
ctx, preCheckpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -413,15 +413,25 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 {
if newCheckpointTs != math.MaxUint64 || newResolvedTs != math.MaxUint64 {
log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not",
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
}
newResolvedTs = barrier.GlobalBarrierTs
newCheckpointTs = barrier.minDDLBarrierTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
// Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter
// whether redo is enabled.
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
log.Panic("newCheckpointTs is larger than minDDLBarrierTs",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Any("barrier", barrier))
}

prevResolvedTs := c.state.Status.ResolvedTs
Expand Down Expand Up @@ -449,6 +459,14 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
if newCheckpointTs > prevResolvedTs {
newCheckpointTs = prevResolvedTs
if newCheckpointTs < preCheckpointTs {
log.Panic("checkpointTs should never regress",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("checkpointTs", preCheckpointTs))
}
}
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand Down Expand Up @@ -977,6 +995,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
Expand Down

0 comments on commit c5f11a5

Please sign in to comment.