Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized #9022

Merged
merged 4 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,10 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -371,13 +371,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
log.Debug("owner handles barrier",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
if barrier.GlobalBarrierTs < preCheckpointTs {
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
Expand All @@ -386,7 +386,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
ctx, preCheckpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -411,15 +411,23 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
// If allPhysicalTables is empty, the newResolvedTs and newCheckpointTs shoulde
// be max uint64. In this case, we need to advance newResolvedTs to global barrier
// ts and advance newCheckpointTs to min table barrier ts.
if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 {
if newCheckpointTs != newResolvedTs {
log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not",
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
}
newResolvedTs = barrier.GlobalBarrierTs
newCheckpointTs = barrier.minDDLBarrierTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
// Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter
// whether redo is enabled.
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}
Expand Down Expand Up @@ -449,6 +457,17 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
// If allPhysicalTables is empty, newCheckpointTs would advance to min table barrier ts, which may be larger
// than preResolvedTs. In this case, we need to set newCheckpointTs to preResolvedTs to guarantee that the
// checkpointTs will not cross the preResolvedTs.
if newCheckpointTs > prevResolvedTs {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
newCheckpointTs = prevResolvedTs
if newCheckpointTs < preCheckpointTs {
log.Panic("checkpointTs should never regress",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("checkpointTs", preCheckpointTs))
}
}
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand Down Expand Up @@ -977,6 +996,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/consistent_replicate_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ function run() {
run_sql "DROP TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE consistent_replicate_ddl.usertable_bak TO consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO consistent_replicate_ddl.usertable1 SELECT * FROM consistent_replicate_ddl.usertable limit 10" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# run_sql "TRUNCATE TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE consistent_replicate_ddl.usertable1 TO consistent_replicate_ddl.usertable1_1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT IGNORE INTO consistent_replicate_ddl.usertable1_1 SELECT * FROM consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

Expand Down Expand Up @@ -97,7 +96,7 @@ function run() {
--storage="$storage_path" \
--sink-uri="mysql://normal:123456@127.0.0.1:3306/"

# sleep 6000000000000
check_table_exists "consistent_replicate_ddl.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml
}

Expand Down