Skip to content

Commit

Permalink
owner, redo(ticdc): decouple globalResolvedTs and globalBarrierTs (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 22, 2023
1 parent 1881877 commit 104ec92
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 118 deletions.
6 changes: 3 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (p ProcessorsInfos) String() string {
type ChangeFeedStatus struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// MinTableBarrierTs is the minimum table barrier timestamp of all tables.
// It is only used when a changefeed is started to check whether there was
// a table's DDL job that had not finished when the changefeed was stopped.
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
}
Expand Down
48 changes: 21 additions & 27 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package owner

import (
"context"
"math"
"sync"
"time"

Expand Down Expand Up @@ -282,7 +283,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}
// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -302,11 +303,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
barrier.GlobalBarrierTs = otherBarrierTs
}

if minTableBarrierTs > otherBarrierTs {
if barrier.minDDLBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
minTableBarrierTs = otherBarrierTs
barrier.minDDLBarrierTs = otherBarrierTs
}

log.Debug("owner handles barrier",
Expand All @@ -315,7 +316,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", minTableBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
Expand All @@ -327,9 +328,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier)
// metricsResolvedTs to store the min resolved ts among all tables and show it in metrics
metricsResolvedTs := newResolvedTs
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -356,19 +355,22 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs > barrier.GlobalBarrierTs {
newResolvedTs = barrier.GlobalBarrierTs
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > minTableBarrierTs {
newCheckpointTs = minTableBarrierTs
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}

prevResolvedTs := c.state.Status.ResolvedTs
if c.redoMetaMgr.Enabled() {
if newResolvedTs > barrier.redoBarrierTs {
newResolvedTs = barrier.redoBarrierTs
}
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
// NOTE: `UpdateMeta` handles regressed checkpointTs and resolvedTs internally.
Expand All @@ -389,7 +391,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
metricsResolvedTs = newResolvedTs
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand All @@ -401,12 +402,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
metricsResolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
if minTableBarrierTs < c.state.Status.MinTableBarrierTs {
minTableBarrierTs = c.state.Status.MinTableBarrierTs
if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
Expand All @@ -420,8 +420,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, minTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)

return nil
}
Expand Down Expand Up @@ -855,25 +855,19 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
// 1. All data before the barrierTs was sent to downstream.
// 2. No more data after barrierTs was sent to downstream.
checkpointReachBarrier := barrierTs == c.state.Status.CheckpointTs

// TODO: To check if we can remove the `barrierTs == c.state.Status.ResolvedTs` condition.
fullyBlocked := checkpointReachBarrier && barrierTs == c.state.Status.ResolvedTs
if !checkpointReachBarrier {
return barrierTs, nil
}

switch barrierTp {
case syncPointBarrier:
if !fullyBlocked {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval))
if err := c.ddlSink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
case finishBarrier:
if fullyBlocked {
c.feedStateManager.MarkFinished()
}
return barrierTs, nil
c.feedStateManager.MarkFinished()
default:
log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
}
Expand Down
Loading

0 comments on commit 104ec92

Please sign in to comment.