From 3b913161bd07e46cbfff1ec2fed236d4d122146b Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 24 May 2023 09:22:47 +0800 Subject: [PATCH 1/3] fix resolvedTs --- cdc/owner/changefeed.go | 49 ++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 10dc2869dab..a300aa7cfc6 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -309,10 +309,10 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error { adminJobPending := c.feedStateManager.Tick(c.state) - checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status) + preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status) // checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()` // to ensure all changefeeds, no matter whether they are running or not, will be checked. - if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil { + if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil { return errors.Trace(err) } @@ -341,7 +341,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } // TODO: pass table checkpointTs when we support concurrent process ddl - allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil) + allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil) if err != nil { return errors.Trace(err) } @@ -371,13 +371,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* log.Debug("owner handles barrier", zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), - zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("checkpointTs", preCheckpointTs), zap.Uint64("resolvedTs", c.state.Status.ResolvedTs), zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs), zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs), zap.Any("tableBarrier", barrier.TableBarriers)) - if barrier.GlobalBarrierTs < checkpointTs { + if barrier.GlobalBarrierTs < preCheckpointTs { // This condition implies that the DDL resolved-ts has not yet reached checkpointTs, // which implies that it would be premature to schedule tables or to update status. // So we return here. @@ -386,7 +386,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* startTime := time.Now() newCheckpointTs, newResolvedTs, err := c.scheduler.Tick( - ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier) + ctx, preCheckpointTs, allPhysicalTables, captures, barrier.Barrier) costTime := time.Since(startTime) if costTime > schedulerLogsWarnDuration { log.Warn("scheduler tick took too long", @@ -411,15 +411,25 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* return nil } - // If the owner is just initialized, the newResolvedTs may be max uint64. - // In this case, we should not update the resolved ts. - if newResolvedTs == math.MaxUint64 { - newResolvedTs = c.state.Status.ResolvedTs + // If allPhysicalTables is empty, the newResolvedTs and newCheckpointTs shoulde + // be max uint64. In this case, we need to advance newResolvedTs to global barrier + // ts and advance newCheckpointTs to min table barrier ts. + if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 { + if newCheckpointTs != math.MaxUint64 || newResolvedTs != math.MaxUint64 { + log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not", + zap.Uint64("checkpointTs", preCheckpointTs), + zap.Uint64("resolvedTs", c.state.Status.ResolvedTs), + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("newResolvedTs", newResolvedTs)) + } + newResolvedTs = barrier.GlobalBarrierTs + newCheckpointTs = barrier.minDDLBarrierTs } - // If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`. - // In such case the `newCheckpointTs` may be larger than the minTableBarrierTs, - // but it shouldn't be, so we need to handle it here. + // If the table pipeline corresponding to minDDLBarrierTs is not ready, newCheckpointTs + // should wait for it. + // Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter + // whether redo is enabled. if newCheckpointTs > barrier.minDDLBarrierTs { newCheckpointTs = barrier.minDDLBarrierTs } @@ -449,6 +459,14 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } else { newResolvedTs = prevResolvedTs } + if newCheckpointTs > prevResolvedTs { + newCheckpointTs = prevResolvedTs + if newCheckpointTs < preCheckpointTs { + log.Panic("checkpointTs should never regress", + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("checkpointTs", preCheckpointTs)) + } + } } log.Debug("owner prepares to update status", zap.Uint64("prevResolvedTs", prevResolvedTs), @@ -977,6 +995,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod } func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) { + if checkpointTs > resolvedTs { + log.Panic("checkpointTs is greater than resolvedTs", + zap.Uint64("checkpointTs", checkpointTs), + zap.Uint64("resolvedTs", resolvedTs)) + } c.state.PatchStatus( func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { changed := false From 61887681d89c254994aec6ee411febda596048ee Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 24 May 2023 09:31:53 +0800 Subject: [PATCH 2/3] fix tests --- tests/integration_tests/consistent_replicate_ddl/run.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration_tests/consistent_replicate_ddl/run.sh b/tests/integration_tests/consistent_replicate_ddl/run.sh index b1e23abc5b5..2760037a9f2 100644 --- a/tests/integration_tests/consistent_replicate_ddl/run.sh +++ b/tests/integration_tests/consistent_replicate_ddl/run.sh @@ -56,7 +56,6 @@ function run() { run_sql "DROP TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "RENAME TABLE consistent_replicate_ddl.usertable_bak TO consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT INTO consistent_replicate_ddl.usertable1 SELECT * FROM consistent_replicate_ddl.usertable limit 10" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - # run_sql "TRUNCATE TABLE consistent_replicate_ddl.usertable1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "RENAME TABLE consistent_replicate_ddl.usertable1 TO consistent_replicate_ddl.usertable1_1" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "INSERT IGNORE INTO consistent_replicate_ddl.usertable1_1 SELECT * FROM consistent_replicate_ddl.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -97,7 +96,7 @@ function run() { --storage="$storage_path" \ --sink-uri="mysql://normal:123456@127.0.0.1:3306/" - # sleep 6000000000000 + check_table_exists "consistent_replicate_ddl.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml } From 5cadf0c7d0bcb0c5dbf03bbe987a9acdebafe2a4 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 24 May 2023 13:55:11 +0800 Subject: [PATCH 3/3] add comments --- cdc/owner/changefeed.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index a300aa7cfc6..7e91d7c5be9 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -415,7 +415,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* // be max uint64. In this case, we need to advance newResolvedTs to global barrier // ts and advance newCheckpointTs to min table barrier ts. if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 { - if newCheckpointTs != math.MaxUint64 || newResolvedTs != math.MaxUint64 { + if newCheckpointTs != newResolvedTs { log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not", zap.Uint64("checkpointTs", preCheckpointTs), zap.Uint64("resolvedTs", c.state.Status.ResolvedTs), @@ -426,8 +426,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* newCheckpointTs = barrier.minDDLBarrierTs } - // If the table pipeline corresponding to minDDLBarrierTs is not ready, newCheckpointTs - // should wait for it. // Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter // whether redo is enabled. if newCheckpointTs > barrier.minDDLBarrierTs { @@ -459,6 +457,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* } else { newResolvedTs = prevResolvedTs } + // If allPhysicalTables is empty, newCheckpointTs would advance to min table barrier ts, which may be larger + // than preResolvedTs. In this case, we need to set newCheckpointTs to preResolvedTs to guarantee that the + // checkpointTs will not cross the preResolvedTs. if newCheckpointTs > prevResolvedTs { newCheckpointTs = prevResolvedTs if newCheckpointTs < preCheckpointTs {