Skip to content

Commit

Permalink
owner(ticdc): fix the issue of resolvedTs may stuck when no table is …
Browse files Browse the repository at this point in the history
…synchronized (#9022)

ref #8963
  • Loading branch information
CharlesCheung96 authored May 24, 2023
1 parent 7e1292b commit eeb6b9f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
50 changes: 37 additions & 13 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -371,13 +371,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
log.Debug("owner handles barrier",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
if barrier.GlobalBarrierTs < preCheckpointTs {
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
Expand All @@ -386,7 +386,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
ctx, preCheckpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -411,15 +411,23 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
// If allPhysicalTables is empty, the newResolvedTs and newCheckpointTs shoulde
// be max uint64. In this case, we need to advance newResolvedTs to global barrier
// ts and advance newCheckpointTs to min table barrier ts.
if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 {
if newCheckpointTs != newResolvedTs {
log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not",
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
}
newResolvedTs = barrier.GlobalBarrierTs
newCheckpointTs = barrier.minDDLBarrierTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
// Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter
// whether redo is enabled.
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}
Expand Down Expand Up @@ -449,6 +457,17 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
// If allPhysicalTables is empty, newCheckpointTs would advance to min table barrier ts, which may be larger
// than preResolvedTs. In this case, we need to set newCheckpointTs to preResolvedTs to guarantee that the
// checkpointTs will not cross the preResolvedTs.
if newCheckpointTs > prevResolvedTs {
newCheckpointTs = prevResolvedTs
if newCheckpointTs < preCheckpointTs {
log.Panic("checkpointTs should never regress",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("checkpointTs", preCheckpointTs))
}
}
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand Down Expand Up @@ -977,6 +996,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/consistent_replicate_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ function run() {
run_sql "DROP TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE consistent_replicate_ddl.usertable_bak TO consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO consistent_replicate_ddl.usertable1 SELECT * FROM consistent_replicate_ddl.usertable limit 10" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# run_sql "TRUNCATE TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE consistent_replicate_ddl.usertable1 TO consistent_replicate_ddl.usertable1_1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT IGNORE INTO consistent_replicate_ddl.usertable1_1 SELECT * FROM consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

Expand Down Expand Up @@ -97,7 +96,7 @@ function run() {
--storage="$storage_path" \
--sink-uri="mysql://normal:123456@127.0.0.1:3306/"

# sleep 6000000000000
check_table_exists "consistent_replicate_ddl.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml
}

Expand Down

0 comments on commit eeb6b9f

Please sign in to comment.