Skip to content

Commit

Permalink
refactor some code
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed May 18, 2023
1 parent 8e11bbd commit 53a3ef0
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 64 deletions.
6 changes: 3 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func (p ProcessorsInfos) String() string {
type ChangeFeedStatus struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// MinTableBarrierTs is the minimum table barrier timestamp of all tables.
// It is only used when a changefeed is started to check whether there was
// a table's DDL job that had not finished when the changefeed was stopped.
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
}
Expand Down
26 changes: 11 additions & 15 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
barrier.GlobalBarrierTs = otherBarrierTs
}

if barrier.minTableBarrierTs > otherBarrierTs {
if barrier.minDDLBarrierTs > otherBarrierTs {
log.Debug("There are other barriers less than min table barrier, wait for them",
zap.Uint64("otherBarrierTs", otherBarrierTs),
zap.Uint64("ddlBarrierTs", barrier.GlobalBarrierTs))
barrier.minTableBarrierTs = otherBarrierTs
barrier.minDDLBarrierTs = otherBarrierTs
}

log.Debug("owner handles barrier",
Expand All @@ -339,7 +339,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minTableBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.minDDLBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))

if barrier.GlobalBarrierTs < checkpointTs {
Expand All @@ -352,8 +352,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, checkpointTs, allPhysicalTables, captures, barrier.Barrier)
// metricsResolvedTs to store the min resolved ts among all tables and show it in metrics
metricsResolvedTs := newResolvedTs
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand Down Expand Up @@ -387,14 +385,14 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// If the owner is just initialized, minTableBarrierTs can be `checkpointTs-1`.
// In such case the `newCheckpointTs` may be larger than the minTableBarrierTs,
// but it shouldn't be, so we need to handle it here.
if newCheckpointTs > barrier.minTableBarrierTs {
newCheckpointTs = barrier.minTableBarrierTs
if newCheckpointTs > barrier.minDDLBarrierTs {
newCheckpointTs = barrier.minDDLBarrierTs
}

prevResolvedTs := c.state.Status.ResolvedTs
if c.redoMetaMgr.Enabled() {
if newResolvedTs > barrier.physicalTableBarrierTs {
newResolvedTs = barrier.physicalTableBarrierTs
if newResolvedTs > barrier.redoBarrierTs {
newResolvedTs = barrier.redoBarrierTs
}
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
Expand All @@ -416,7 +414,6 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
} else {
newResolvedTs = prevResolvedTs
}
metricsResolvedTs = newResolvedTs
}
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
Expand All @@ -428,12 +425,11 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
metricsResolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
if barrier.minTableBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minTableBarrierTs = c.state.Status.MinTableBarrierTs
if barrier.minDDLBarrierTs < c.state.Status.MinTableBarrierTs {
barrier.minDDLBarrierTs = c.state.Status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
Expand All @@ -447,8 +443,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, metricsResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs, barrier.minDDLBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
c.tickDownstreamObserver(ctx)

return nil
Expand Down
82 changes: 38 additions & 44 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,12 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{

type ddlBarrier struct {
*schedulepb.Barrier
// minTableBarrierTs is the minimum table barrier timestamp of all tables.
// It is only used when a changefeed is started to check whether there was
// a table's DDL job that had not finished when the changefeed was stopped.
minTableBarrierTs model.Ts
// physicalTableBarrierTs is the minimum ts of all ddl events that create
// a new physical table.
physicalTableBarrierTs model.Ts
// minDDLBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
minDDLBarrierTs model.Ts
// redoBarrierTs is the minimum ts of ddl events that create a new physical table.
redoBarrierTs model.Ts
}

// ddlManager holds the pending DDL events of all tables and responsible for
Expand Down Expand Up @@ -442,71 +441,66 @@ func (m *ddlManager) barrier() *ddlBarrier {
Barrier: &schedulepb.Barrier{
GlobalBarrierTs: m.ddlResolvedTs,
},
minTableBarrierTs: m.ddlResolvedTs,
physicalTableBarrierTs: m.ddlResolvedTs,
minDDLBarrierTs: m.ddlResolvedTs,
redoBarrierTs: m.ddlResolvedTs,
}
tableBarrierMap := make(map[model.TableID]model.Ts)
ddls := m.getAllTableNextDDL()
if m.justSentDDL != nil {
ddls = append(ddls, m.justSentDDL)
}

for _, ddl := range ddls {
if m.redoMetaManager.Enabled() && isRedoBerrierDDL(ddl) {
if ddl.CommitTs < barrier.minDDLBarrierTs {
barrier.minDDLBarrierTs = ddl.CommitTs
}
if m.redoMetaManager.Enabled() && isRedoBarrierDDL(ddl) {
// The pipeline for a new table does not exist until the ddl is successfully
// executed, so the table's resolvedTs will not be calculated in redo.
// To solve this problem, resovedTs of redoDMLManager should not be greater
// than the min commitTs of create table DDL.
if ddl.CommitTs < barrier.physicalTableBarrierTs {
barrier.physicalTableBarrierTs = ddl.CommitTs
// To solve this problem, resovedTs of redo manager should not be greater
// than the min commitTs of ddls that create a new physical table.
if ddl.CommitTs < barrier.redoBarrierTs {
barrier.redoBarrierTs = ddl.CommitTs
}
}
// When there is a global DDL, we need to wait all tables
// checkpointTs reach its commitTs before we can execute it.
if isGlobalDDL(ddl) {
// When there is a global DDL, we need to wait all tables
// checkpointTs reach its commitTs before we can execute it.
if ddl.CommitTs < barrier.GlobalBarrierTs {
barrier.GlobalBarrierTs = ddl.CommitTs
}
} else {
ids := getPhysicalTableIDs(ddl)
// barrier related physical tables
ids := getRelatedPhysicalTableIDs(ddl)
for _, id := range ids {
tableBarrierMap[id] = ddl.CommitTs
}
}

// minTableBarrierTs is the min commitTs of all tables DDLs,
// it is used to prevent the checkpointTs from advancing too fast
// when a changefeed is just resumed.
if ddl.CommitTs < barrier.minTableBarrierTs {
barrier.minTableBarrierTs = ddl.CommitTs
}
}

for tb, barrierTs := range tableBarrierMap {
if barrierTs > barrier.GlobalBarrierTs {
delete(tableBarrierMap, tb)
// calculate tableBarriers
var tableBarriers []*schedulepb.TableBarrier
for tableID, tableBarrierTs := range tableBarrierMap {
if tableBarrierTs > barrier.GlobalBarrierTs {
continue
}
}

var tableBarrier []*schedulepb.TableBarrier
for tb, barrierTs := range tableBarrierMap {
tableBarrier = append(tableBarrier, &schedulepb.TableBarrier{
TableID: tb,
BarrierTs: barrierTs,
tableBarriers = append(tableBarriers, &schedulepb.TableBarrier{
TableID: tableID,
BarrierTs: tableBarrierTs,
})
}

// Limit the tableBarrier size to avoid too large barrier. Since it will
// cause the scheduler to be slow.
sort.Slice(tableBarrier, func(i, j int) bool {
return tableBarrier[i].BarrierTs < tableBarrier[j].BarrierTs
sort.Slice(tableBarriers, func(i, j int) bool {
return tableBarriers[i].BarrierTs < tableBarriers[j].BarrierTs
})
if len(tableBarrier) > tableBarrierNumberLimit {
barrier.GlobalBarrierTs = tableBarrier[tableBarrierNumberLimit].BarrierTs
tableBarrier = tableBarrier[:tableBarrierNumberLimit]
if len(tableBarriers) > tableBarrierNumberLimit {
barrier.GlobalBarrierTs = tableBarriers[tableBarrierNumberLimit].BarrierTs
tableBarriers = tableBarriers[:tableBarrierNumberLimit]
}

m.justSentDDL = nil
barrier.TableBarriers = tableBarrier
barrier.TableBarriers = tableBarriers
return barrier
}

Expand Down Expand Up @@ -597,9 +591,9 @@ func (m *ddlManager) cleanCache() {
m.physicalTablesCache = nil
}

// getPhysicalTableIDs get all related physical table ids of a ddl event.
// getRelatedPhysicalTableIDs get all related physical table ids of a ddl event.
// It is a helper function to calculate tableBarrier.
func getPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID {
func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID {
res := make([]model.TableID, 0, 1)
table := ddl.TableInfo
if ddl.PreTableInfo != nil {
Expand All @@ -626,7 +620,7 @@ func isGlobalDDL(ddl *model.DDLEvent) bool {
return !ok
}

func isRedoBerrierDDL(ddl *model.DDLEvent) bool {
func isRedoBarrierDDL(ddl *model.DDLEvent) bool {
_, ok := redoBarrierDDLs[ddl.Type]
return ok
}
4 changes: 2 additions & 2 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestBarriers(t *testing.T) {
// advance the ddlResolvedTs
dm.ddlResolvedTs = 6
ddlBarrier := dm.barrier()
minTableBarrierTs, barrier := ddlBarrier.minTableBarrierTs, ddlBarrier.Barrier
minTableBarrierTs, barrier := ddlBarrier.minDDLBarrierTs, ddlBarrier.Barrier
require.Equal(t, expectedMinTableBarrier, minTableBarrierTs)
require.Equal(t, expectedBarrier, barrier)

Expand All @@ -134,7 +134,7 @@ func TestBarriers(t *testing.T) {
newFakeDDLEvent(tableID, tableName.Table, timodel.ActionAddColumn, uint64(i)))
}
ddlBarrier = dm.barrier()
minTableBarrierTs, barrier = ddlBarrier.minTableBarrierTs, ddlBarrier.Barrier
minTableBarrierTs, barrier = ddlBarrier.minDDLBarrierTs, ddlBarrier.Barrier
require.Equal(t, uint64(0), minTableBarrierTs)
require.Equal(t, uint64(256), barrier.GlobalBarrierTs)
require.Equal(t, 256, len(barrier.TableBarriers))
Expand Down

0 comments on commit 53a3ef0

Please sign in to comment.