Skip to content

Commit

Permalink
fix resolvedTs and checkpointTs
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 23, 2023
1 parent 9cde63c commit bd97c4c
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 69 deletions.
48 changes: 19 additions & 29 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -361,11 +360,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
barrier.GlobalBarrierTs = otherBarrierTs
}

if barrier.minDDLBarrierTs > otherBarrierTs {
if barrier.MinTableBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
barrier.minDDLBarrierTs = otherBarrierTs
barrier.MinTableBarrierTs = otherBarrierTs
}

log.Debug("owner handles barrier",
Expand All @@ -374,7 +373,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.MinTableBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
Expand All @@ -384,20 +383,16 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID), zap.Duration("duration", costTime))
}
ctx, checkpointTs, allPhysicalTables, captures, barrier.BarrierWithMinTs)
if err != nil {
return errors.Trace(err)
}

pdTime, _ := c.upstream.PDClock.CurrentTime()
pdTime, err := c.upstream.PDClock.CurrentTime()
if err != nil {
return errors.Trace(err)
}
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
Expand All @@ -411,19 +406,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

// If the owner is just initialized, the newResolvedTs may be max uint64.
// In this case, we should not update the resolved ts.
if newResolvedTs == math.MaxUint64 {
newResolvedTs = c.state.Status.ResolvedTs
}

// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}

prevResolvedTs := c.state.Status.ResolvedTs
if c.redoMetaMgr.Enabled() {
if newResolvedTs > barrier.redoBarrierTs {
Expand All @@ -449,6 +431,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
if newCheckpointTs > prevResolvedTs {
newCheckpointTs = prevResolvedTs
}
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand All @@ -463,8 +448,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

// MinTableBarrierTs should never regress
if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs
if barrier.MinTableBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.MinTableBarrierTs = c.state.Status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
Expand All @@ -478,7 +463,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs)
c.updateStatus(newCheckpointTs, newResolvedTs, barrier.MinTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
c.tickDownstreamObserver(ctx)

Expand Down Expand Up @@ -977,6 +962,11 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (m *mockScheduler) Tick(
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
barrier *schedulepb.Barrier,
barrier schedulepb.BarrierWithMinTs,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
m.currentTables = currentTables
return model.Ts(math.MaxUint64), model.Ts(math.MaxUint64), nil
Expand Down
17 changes: 5 additions & 12 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
}

type ddlBarrier struct {
*schedulepb.Barrier
// minDDLBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
minDDLBarrierTs model.Ts
schedulepb.BarrierWithMinTs
// redoBarrierTs is the minimum ts of ddl events that create a new physical table.
redoBarrierTs model.Ts
}
Expand Down Expand Up @@ -438,11 +434,8 @@ func (m *ddlManager) getAllTableNextDDL() []*model.DDLEvent {
// barrier returns ddlResolvedTs and tableBarrier
func (m *ddlManager) barrier() *ddlBarrier {
barrier := &ddlBarrier{
Barrier: &schedulepb.Barrier{
GlobalBarrierTs: m.ddlResolvedTs,
},
minDDLBarrierTs: m.ddlResolvedTs,
redoBarrierTs: m.ddlResolvedTs,
BarrierWithMinTs: schedulepb.NewBarrierWithMinTs(m.ddlResolvedTs),
redoBarrierTs: m.ddlResolvedTs,
}
tableBarrierMap := make(map[model.TableID]model.Ts)
ddls := m.getAllTableNextDDL()
Expand All @@ -451,8 +444,8 @@ func (m *ddlManager) barrier() *ddlBarrier {
}

for _, ddl := range ddls {
if ddl.CommitTs < barrier.minDDLBarrierTs {
barrier.minDDLBarrierTs = ddl.CommitTs
if ddl.CommitTs < barrier.MinTableBarrierTs {
barrier.MinTableBarrierTs = ddl.CommitTs
}
if m.redoMetaManager.Enabled() && isRedoBarrierDDL(ddl) {
// The pipeline for a new table does not exist until the ddl is successfully
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestBarriers(t *testing.T) {
// advance the ddlResolvedTs
dm.ddlResolvedTs = 6
ddlBarrier := dm.barrier()
minTableBarrierTs, barrier := ddlBarrier.minDDLBarrierTs, ddlBarrier.Barrier
minTableBarrierTs, barrier := ddlBarrier.MinTableBarrierTs, ddlBarrier.Barrier
require.Equal(t, expectedMinTableBarrier, minTableBarrierTs)
require.Equal(t, expectedBarrier, barrier)

Expand All @@ -134,7 +134,7 @@ func TestBarriers(t *testing.T) {
newFakeDDLEvent(tableID, tableName.Table, timodel.ActionAddColumn, uint64(i)))
}
ddlBarrier = dm.barrier()
minTableBarrierTs, barrier = ddlBarrier.minDDLBarrierTs, ddlBarrier.Barrier
minTableBarrierTs, barrier = ddlBarrier.MinTableBarrierTs, ddlBarrier.Barrier
require.Equal(t, uint64(0), minTableBarrierTs)
require.Equal(t, uint64(256), barrier.GlobalBarrierTs)
require.Equal(t, 256, len(barrier.TableBarriers))
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ const (
// should print a warning log, and if necessary, the timeout should be exposed externally through
// monitor.
changefeedLogsWarnDuration = 1 * time.Second
schedulerLogsWarnDuration = 1 * time.Second

// TiDB collects metric data every 1 minute
downstreamObserverTickDuration = 30 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Scheduler interface {
// barrier contains the barrierTs of those tables that have
// ddl jobs that need to be replicated. The Scheduler will
// broadcast the barrierTs to all captures through the Heartbeat.
barrier *schedulepb.Barrier,
barrier schedulepb.BarrierWithMinTs,
) (newCheckpointTs, newResolvedTs model.Ts, err error)

// MoveTable requests that a table be moved to target.
Expand Down
15 changes: 9 additions & 6 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *coordinator) Tick(
currentTables []model.TableID,
// All captures that are alive according to the latest Etcd states.
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
barrier *schedulepb.Barrier,
barrier schedulepb.BarrierWithMinTs,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -259,8 +259,11 @@ func (c *coordinator) Close(ctx context.Context) {
// ===========

func (c *coordinator) poll(
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo, barrier *schedulepb.Barrier,
ctx context.Context,
checkpointTs model.Ts,
currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
barrier schedulepb.BarrierWithMinTs,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.maybeCollectMetrics()
if c.compat.UpdateCaptureInfo(aliveCaptures) {
Expand All @@ -277,7 +280,7 @@ func (c *coordinator) poll(
var msgBuf []*schedulepb.Message
c.captureM.HandleMessage(recvMsgs)
msgs := c.captureM.Tick(c.replicationM.ReplicationSets(),
c.schedulerM.DrainingTarget(), barrier)
c.schedulerM.DrainingTarget(), barrier.Barrier)
msgBuf = append(msgBuf, msgs...)
msgs = c.captureM.HandleAliveCaptureUpdate(aliveCaptures)
msgBuf = append(msgBuf, msgs...)
Expand All @@ -302,7 +305,7 @@ func (c *coordinator) poll(
if !c.captureM.CheckAllCaptureInitialized() {
// Skip generating schedule tasks for replication manager,
// as not all capture are initialized.
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime)
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier)
return newCheckpointTs, newResolvedTs, c.sendMsgs(ctx, msgBuf)
}

Expand Down Expand Up @@ -338,7 +341,7 @@ func (c *coordinator) poll(
}

// Checkpoint calculation
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime)
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(&c.tableRanges, pdTime, barrier)
return newCheckpointTs, newResolvedTs, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func benchmarkCoordinator(
b.ResetTimer()
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
coord.poll(ctx, 0, currentTables, captures, nil)
coord.poll(ctx, 0, currentTables, captures, schedulepb.NewBarrierWithMinTs(0))
}
})
b.StopTimer()
Expand Down
16 changes: 8 additions & 8 deletions cdc/scheduler/internal/v3/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestCoordinatorHeartbeat(t *testing.T) {
ctx := context.Background()
currentTables := []model.TableID{1, 2, 3}
aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}}
_, _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
_, _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
msgs := trans.SendBuffer
require.Len(t, msgs, 2)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestCoordinatorHeartbeat(t *testing.T) {
},
})
trans.SendBuffer = []*schedulepb.Message{}
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
require.True(t, coord.captureM.CheckAllCaptureInitialized())
msgs = trans.SendBuffer
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestCoordinatorAddCapture(t *testing.T) {
ctx := context.Background()
currentTables := []model.TableID{1, 2, 3}
aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}}
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
msgs = trans.SendBuffer
require.Len(t, msgs, 1)
Expand All @@ -315,7 +315,7 @@ func TestCoordinatorAddCapture(t *testing.T) {
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
})
trans.SendBuffer = []*schedulepb.Message{}
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
msgs = trans.SendBuffer
require.Len(t, msgs, 1)
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestCoordinatorRemoveCapture(t *testing.T) {
ctx := context.Background()
currentTables := []model.TableID{1, 2, 3}
aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}}
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
_, _, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
msgs = trans.SendBuffer
require.Len(t, msgs, 1)
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) {
ctx := context.Background()
currentTables := []model.TableID{1, 2}
aliveCaptures := map[model.CaptureID]*model.CaptureInfo{"a": {}, "b": {}}
_, _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
_, _, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)

// Initialize captures.
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) {
},
},
})
cts, rts, err := coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
cts, rts, err := coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
require.True(t, coord.captureM.CheckAllCaptureInitialized())
require.EqualValues(t, 2, cts)
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestCoordinatorAdvanceCheckpoint(t *testing.T) {
},
},
})
cts, rts, err = coord.poll(ctx, 0, currentTables, aliveCaptures, nil)
cts, rts, err = coord.poll(ctx, 0, currentTables, aliveCaptures, schedulepb.NewBarrierWithMinTs(0))
require.Nil(t, err)
require.False(t, coord.captureM.CheckAllCaptureInitialized())
require.EqualValues(t, 3, cts)
Expand Down
16 changes: 15 additions & 1 deletion cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,9 @@ func (r *Manager) RunningTasks() *spanz.BtreeMap[*ScheduleTask] {

// AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.
func (r *Manager) AdvanceCheckpoint(
currentTables *TableRanges, currentPDTime time.Time,
currentTables *TableRanges,
currentPDTime time.Time,
barrier schedulepb.BarrierWithMinTs,
) (newCheckpointTs, newResolvedTs model.Ts) {
newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
slowestRange := tablepb.Span{}
Expand Down Expand Up @@ -575,6 +577,18 @@ func (r *Manager) AdvanceCheckpoint(
r.slowestTableID = slowestRange
}

// If currentTables is empty, we should advance newResolvedTs to global barrier ts and
// advance newCheckpointTs to min table barrier ts.
if newResolvedTs == math.MaxUint64 {
if newCheckpointTs != math.MaxUint64 {
log.Panic("newResolvedTs is max uint64 but newCheckpointTs is not",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
}
newResolvedTs = barrier.GlobalBarrierTs
newCheckpointTs = barrier.MinTableBarrierTs
}

// If changefeed's checkpoint lag is larger than 30s,
// log the 4 slowlest table infos every minute, which can
// help us find the problematic tables.
Expand Down
Loading

0 comments on commit bd97c4c

Please sign in to comment.