Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner, redo(ticdc): decouple globalResolvedTs and globalBarrierTs #8964

Merged
merged 5 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ storage_consumer:
kafka_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/kafka-consumer.Dockerfile . -t ticdc:kafka-consumer --platform linux/amd64

storage_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/storage-consumer.Dockerfile . -t ticdc:storage-consumer --platform linux/amd64
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func (p ProcessorsInfos) String() string {
type ChangeFeedStatus struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// MinTableBarrierTs is the minimum table barrier timestamp of all tables.
// It is only used when a changefeed is started to check whether there was
// a table's DDL job that had not finished when the changefeed was stopped.
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
}
Expand Down
48 changes: 21 additions & 27 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -305,7 +306,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, minTableBarrierTs, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, checkpointTs, nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -325,11 +326,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
barrier.GlobalBarrierTs = otherBarrierTs
}

if minTableBarrierTs > otherBarrierTs {
if barrier.minDDLBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
minTableBarrierTs = otherBarrierTs
barrier.minDDLBarrierTs = otherBarrierTs
}

log.Debug("owner handles barrier",
Expand All @@ -338,7 +339,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", minTableBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
Expand All @@ -350,9 +351,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier)
// metricsResolvedTs to store the min resolved ts among all tables and show it in metrics
metricsResolvedTs := newResolvedTs
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -379,19 +378,22 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs > barrier.GlobalBarrierTs {
newResolvedTs = barrier.GlobalBarrierTs
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > minTableBarrierTs {
newCheckpointTs = minTableBarrierTs
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}

prevResolvedTs := c.state.Status.ResolvedTs
if c.redoMetaMgr.Enabled() {
if newResolvedTs > barrier.redoBarrierTs {
newResolvedTs = barrier.redoBarrierTs
}
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
// NOTE: `UpdateMeta` handles regressed checkpointTs and resolvedTs internally.
Expand All @@ -412,7 +414,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
metricsResolvedTs = newResolvedTs
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand All @@ -424,12 +425,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
metricsResolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
if minTableBarrierTs < c.state.Status.MinTableBarrierTs {
minTableBarrierTs = c.state.Status.MinTableBarrierTs
if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
Expand All @@ -443,8 +443,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, minTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which one is the globalBarrierTs now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateStatus only update status in etcd, however globalBarrierTs is updated by p2p, ref: https://github.com/pingcap/tiflow/pull/8964/files#diff-953746562776cf8aa41cd4a971df8c546649d2721ae787607aed2069f3b44eb0R353-R354

c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
c.tickDownstreamObserver(ctx)

return nil
Expand Down Expand Up @@ -896,25 +896,19 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
// 1. All data before the barrierTs was sent to downstream.
// 2. No more data after barrierTs was sent to downstream.
checkpointReachBarrier := barrierTs == c.state.Status.CheckpointTs

// TODO: To check if we can remove the `barrierTs == c.state.Status.ResolvedTs` condition.
fullyBlocked := checkpointReachBarrier && barrierTs == c.state.Status.ResolvedTs
if !checkpointReachBarrier {
return barrierTs, nil
}

switch barrierTp {
case syncPointBarrier:
if !fullyBlocked {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval))
if err := c.ddlSink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
case finishBarrier:
if fullyBlocked {
c.feedStateManager.MarkFinished()
}
return barrierTs, nil
c.feedStateManager.MarkFinished()
default:
log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
}
Expand Down
Loading