Skip to content

Commit

Permalink
owner, redo(ticdc): decouple globalResolvedTs and globalBarrierTs (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 25, 2023
1 parent f926a1b commit a2aaa4c
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 126 deletions.
6 changes: 3 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (p ProcessorsInfos) String() string {
type ChangeFeedStatus struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// MinTableBarrierTs is the minimum table barrier timestamp of all tables.
// It is only used when a changefeed is started to check whether there was
// a table's DDL job that had not finished when the changefeed was stopped.
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
}
Expand Down
88 changes: 53 additions & 35 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package owner

import (
"context"
"math"
"sync"
"time"

Expand Down Expand Up @@ -246,10 +247,10 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -282,7 +283,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}
// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -302,23 +303,23 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
barrier.GlobalBarrierTs = otherBarrierTs
}

if minTableBarrierTs > otherBarrierTs {
if barrier.minDDLBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
minTableBarrierTs = otherBarrierTs
barrier.minDDLBarrierTs = otherBarrierTs
}

log.Debug("owner handles barrier",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", minTableBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
if barrier.GlobalBarrierTs < preCheckpointTs {
// This condition implies that the DDL resolved-ts has not yet reached checkpointTs,
// which implies that it would be premature to schedule tables or to update status.
// So we return here.
Expand All @@ -327,9 +328,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier)
// metricsResolvedTs to store the min resolved ts among all tables and show it in metrics
metricsResolvedTs := newResolvedTs
ctx, preCheckpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -354,21 +353,32 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs > barrier.GlobalBarrierTs {
// If allPhysicalTables is empty, the newResolvedTs and newCheckpointTs shoulde
// be max uint64. In this case, we need to advance newResolvedTs to global barrier
// ts and advance newCheckpointTs to min table barrier ts.
if newResolvedTs == math.MaxUint64 || newCheckpointTs == math.MaxUint64 {
if newCheckpointTs != newResolvedTs {
log.Panic("newResolvedTs and newCheckpointTs should be both max uint64 or not",
zap.Uint64("checkpointTs", preCheckpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
}
newResolvedTs = barrier.GlobalBarrierTs
newCheckpointTs = barrier.minDDLBarrierTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > minTableBarrierTs {
newCheckpointTs = minTableBarrierTs
// Note that newResolvedTs could be larger than barrier.GlobalBarrierTs no matter
// whether redo is enabled.
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}

prevResolvedTs := c.state.Status.ResolvedTs
if c.redoMetaMgr.Enabled() {
if newResolvedTs > barrier.redoBarrierTs {
newResolvedTs = barrier.redoBarrierTs
}
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
// NOTE: `UpdateMeta` handles regressed checkpointTs and resolvedTs internally.
Expand All @@ -389,7 +399,17 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
metricsResolvedTs = newResolvedTs
// If allPhysicalTables is empty, newCheckpointTs would advance to min table barrier ts, which may be larger
// than preResolvedTs. In this case, we need to set newCheckpointTs to preResolvedTs to guarantee that the
// checkpointTs will not cross the preResolvedTs.
if newCheckpointTs > prevResolvedTs {
newCheckpointTs = prevResolvedTs
if newCheckpointTs < preCheckpointTs {
log.Panic("checkpointTs should never regress",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("checkpointTs", preCheckpointTs))
}
}
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand All @@ -401,12 +421,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
metricsResolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
if minTableBarrierTs < c.state.Status.MinTableBarrierTs {
minTableBarrierTs = c.state.Status.MinTableBarrierTs
if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
Expand All @@ -420,8 +439,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, minTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)

return nil
}
Expand Down Expand Up @@ -855,25 +874,19 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
// 1. All data before the barrierTs was sent to downstream.
// 2. No more data after barrierTs was sent to downstream.
checkpointReachBarrier := barrierTs == c.state.Status.CheckpointTs

// TODO: To check if we can remove the `barrierTs == c.state.Status.ResolvedTs` condition.
fullyBlocked := checkpointReachBarrier && barrierTs == c.state.Status.ResolvedTs
if !checkpointReachBarrier {
return barrierTs, nil
}

switch barrierTp {
case syncPointBarrier:
if !fullyBlocked {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval))
if err := c.ddlSink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
case finishBarrier:
if fullyBlocked {
c.feedStateManager.MarkFinished()
}
return barrierTs, nil
c.feedStateManager.MarkFinished()
default:
log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
}
Expand All @@ -899,6 +912,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
Expand Down
Loading

0 comments on commit a2aaa4c

Please sign in to comment.