Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022) #9050

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -282,7 +282,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}
// TODO: pass table checkpointTs when we support concurrent process ddl
<<<<<<< HEAD
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
=======
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
>>>>>>> eeb6b9f69e (owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -312,13 +316,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
log.Debug("owner handles barrier",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", minTableBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
if barrier.GlobalBarrierTs < preCheckpointTs {
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
Expand All @@ -327,9 +331,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
<<<<<<< HEAD
ctx, checkpointTs, allPhysicalTables, captures, barrier)
// metricsResolvedTs to store the min resolved ts among all tables and show it in metrics
metricsResolvedTs := newResolvedTs
=======
ctx, preCheckpointTs, allPhysicalTables, captures, barrier.Barrier)
>>>>>>> eeb6b9f69e (owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022))
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -354,6 +362,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

<<<<<<< HEAD
// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs > barrier.GlobalBarrierTs {
Expand All @@ -365,6 +374,27 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > minTableBarrierTs {
newCheckpointTs = minTableBarrierTs
=======
// If allPhysicalTables is empty, the newResolvedTs and newCheckpointTs shoulde
// be max uint64. In this case, we need to advance newResolvedTs to global barrier
// ts and advance newCheckpointTs to min table barrier ts.
if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 {
if newCheckpointTs != newResolvedTs {
log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not",
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
}
newResolvedTs = barrier.GlobalBarrierTs
newCheckpointTs = barrier.minDDLBarrierTs
}

// Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter
// whether redo is enabled.
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
>>>>>>> eeb6b9f69e (owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022))
}

prevResolvedTs := c.state.Status.ResolvedTs
Expand All @@ -389,7 +419,21 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
<<<<<<< HEAD
metricsResolvedTs = newResolvedTs
=======
// If allPhysicalTables is empty, newCheckpointTs would advance to min table barrier ts, which may be larger
// than preResolvedTs. In this case, we need to set newCheckpointTs to preResolvedTs to guarantee that the
// checkpointTs will not cross the preResolvedTs.
if newCheckpointTs > prevResolvedTs {
newCheckpointTs = prevResolvedTs
if newCheckpointTs < preCheckpointTs {
log.Panic("checkpointTs should never regress",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("checkpointTs", preCheckpointTs))
}
}
>>>>>>> eeb6b9f69e (owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022))
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand Down Expand Up @@ -899,6 +943,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
Expand Down
26 changes: 26 additions & 0 deletions tests/integration_tests/consistent_replicate_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,27 @@ function run() {
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
<<<<<<< HEAD
run_sql "insert into consistent_replicate_ddl.usertable2 select * from consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
=======

# case 1:
# global ddl tests -> ActionRenameTable
# table ddl tests -> ActionDropTable
run_sql "DROP TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE consistent_replicate_ddl.usertable_bak TO consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO consistent_replicate_ddl.usertable1 SELECT * FROM consistent_replicate_ddl.usertable limit 10" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE consistent_replicate_ddl.usertable1 TO consistent_replicate_ddl.usertable1_1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT IGNORE INTO consistent_replicate_ddl.usertable1_1 SELECT * FROM consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

# case 2:
# global ddl tests -> ActionCreateSchema, ActionModifySchemaCharsetAndCollate
# table ddl tests -> ActionMultiSchemaChange, ActionAddColumn, ActionDropColumn, ActionModifyTableCharsetAndCollate
run_sql "CREATE DATABASE consistent_replicate_ddl1" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER DATABASE consistent_replicate_ddl CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_ddl.usertable2 CHARACTER SET utf8mb4 COLLATE utf8mb4_bin" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "INSERT INTO consistent_replicate_ddl.usertable2 SELECT * FROM consistent_replicate_ddl.usertable limit 20" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
>>>>>>> eeb6b9f69e (owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022))
run_sql "ALTER TABLE consistent_replicate_ddl.usertable2 DROP COLUMN FIELD0" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_ddl.usertable2 ADD COLUMN dummy varchar(30)" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_ddl.usertable3 select * from consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand All @@ -71,7 +91,13 @@ function run() {
cdc redo apply --tmp-dir="$tmp_download_path/apply" \
--storage="$storage_path" \
--sink-uri="mysql://normal:123456@127.0.0.1:3306/"
<<<<<<< HEAD
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
=======

check_table_exists "consistent_replicate_ddl.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120
check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml
>>>>>>> eeb6b9f69e (owner(ticdc): fix the issue of resolvedTs may stuck when no table is synchronized (#9022))
}

trap stop EXIT
Expand Down