Skip to content

Commit

Permalink
owner, redo(ticdc): decouple globalResolvedTs and globalBarrierTs (#8964
Browse files Browse the repository at this point in the history
) (#8996)

close #8963
  • Loading branch information
ti-chi-bot authored May 19, 2023
1 parent 2ea7904 commit d877c76
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 118 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ storage_consumer:
kafka_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/kafka-consumer.Dockerfile . -t ticdc:kafka-consumer --platform linux/amd64

storage_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/storage-consumer.Dockerfile . -t ticdc:storage-consumer --platform linux/amd64
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func (p ProcessorsInfos) String() string {
type ChangeFeedStatus struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// MinTableBarrierTs is the minimum table barrier timestamp of all tables.
// It is only used when a changefeed is started to check whether there was
// a table's DDL job that had not finished when the changefeed was stopped.
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
}
Expand Down
48 changes: 21 additions & 27 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -305,7 +306,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -325,11 +326,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
barrier.GlobalBarrierTs = otherBarrierTs
}

if minTableBarrierTs > otherBarrierTs {
if barrier.minDDLBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
minTableBarrierTs = otherBarrierTs
barrier.minDDLBarrierTs = otherBarrierTs
}

log.Debug("owner handles barrier",
Expand All @@ -338,7 +339,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", minTableBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
Expand All @@ -350,9 +351,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier)
// metricsResolvedTs to store the min resolved ts among all tables and show it in metrics
metricsResolvedTs := newResolvedTs
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -379,19 +378,22 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs > barrier.GlobalBarrierTs {
newResolvedTs = barrier.GlobalBarrierTs
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > minTableBarrierTs {
newCheckpointTs = minTableBarrierTs
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}

prevResolvedTs := c.state.Status.ResolvedTs
if c.redoMetaMgr.Enabled() {
if newResolvedTs > barrier.redoBarrierTs {
newResolvedTs = barrier.redoBarrierTs
}
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
// NOTE: `UpdateMeta` handles regressed checkpointTs and resolvedTs internally.
Expand All @@ -412,7 +414,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
metricsResolvedTs = newResolvedTs
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand All @@ -424,12 +425,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
metricsResolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
if minTableBarrierTs < c.state.Status.MinTableBarrierTs {
minTableBarrierTs = c.state.Status.MinTableBarrierTs
if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
Expand All @@ -443,8 +443,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, minTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
c.tickDownstreamObserver(ctx)

return nil
Expand Down Expand Up @@ -896,25 +896,19 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
// 1. All data before the barrierTs was sent to downstream.
// 2. No more data after barrierTs was sent to downstream.
checkpointReachBarrier := barrierTs == c.state.Status.CheckpointTs

// TODO: To check if we can remove the `barrierTs == c.state.Status.ResolvedTs` condition.
fullyBlocked := checkpointReachBarrier && barrierTs == c.state.Status.ResolvedTs
if !checkpointReachBarrier {
return barrierTs, nil
}

switch barrierTp {
case syncPointBarrier:
if !fullyBlocked {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval))
if err := c.ddlSink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
case finishBarrier:
if fullyBlocked {
c.feedStateManager.MarkFinished()
}
return barrierTs, nil
c.feedStateManager.MarkFinished()
default:
log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
}
Expand Down
Loading

0 comments on commit d877c76

Please sign in to comment.