Skip to content

Commit

Permalink
Merge branch 'master' into mysql_support_split_txns
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored May 13, 2022
2 parents bc42747 + 7b7695b commit 9ffb23c
Show file tree
Hide file tree
Showing 39 changed files with 959 additions and 693 deletions.
1 change: 1 addition & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ type DDLEvent struct {
PreTableInfo *SimpleTableInfo `msg:"pre-table-info"`
Query string `msg:"query"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
Expand Down
51 changes: 32 additions & 19 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
schedulerv2 "github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand All @@ -39,12 +39,33 @@ import (
"go.uber.org/zap"
)

// newSchedulerV2FromCtx creates a new schedulerV2 from context.
// This function is factored out to facilitate unit testing.
func newSchedulerV2FromCtx(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
ret, err := scheduler.NewScheduler(
ctx, changeFeedID, startTs, messageServer, messageRouter, ownerRev)
if err != nil {
return nil, errors.Trace(err)
}
return ret, nil
}

func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
return newSchedulerV2FromCtx(ctx, startTs)
}

type changefeed struct {
id model.ChangeFeedID
state *orchestrator.ChangefeedReactorState

upStream *upstream.Upstream
scheduler scheduler
scheduler scheduler.Scheduler
barriers *barriers
feedStateManager *feedStateManager
redoManager redo.LogManager
Expand All @@ -61,7 +82,7 @@ type changefeed struct {
// a DDL job asynchronously. After the DDL job has been executed,
// ddlEventCache will be set to nil. ddlEventCache contains more than
// one event for a rename tables DDL job.
ddlEventCache map[*model.DDLEvent]bool
ddlEventCache []*model.DDLEvent
// currentTableNames is the table names that the changefeed is watching.
// And it contains only the tables of the ddl that have been processed.
// The ones that have not been executed yet do not have.
Expand All @@ -85,7 +106,7 @@ type changefeed struct {

newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler, error)
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)
}

func newChangefeed(id model.ChangeFeedID, upStream *upstream.Upstream) *changefeed {
Expand Down Expand Up @@ -233,7 +254,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
}

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(ctx, c.state, c.schema.AllPhysicalTables(), captures)
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, c.state.Status.CheckpointTs, c.schema.AllPhysicalTables(), captures)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -249,7 +271,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed

// CheckpointCannotProceed implies that not all tables are being replicated normally,
// so in that case there is no need to advance the global watermarks.
if newCheckpointTs != schedulerv2.CheckpointCannotProceed {
if newCheckpointTs != scheduler.CheckpointCannotProceed {
if newResolvedTs > barrierTs {
newResolvedTs = barrierTs
}
Expand Down Expand Up @@ -569,10 +591,7 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
if err != nil {
return false, errors.Trace(err)
}
c.ddlEventCache = make(map[*model.DDLEvent]bool)
for _, event := range ddlEvents {
c.ddlEventCache[event] = false
}
c.ddlEventCache = ddlEvents
for _, ddlEvent := range ddlEvents {
if c.redoManager.Enabled() {
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
Expand All @@ -584,17 +603,11 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
}

jobDone := true
for event, done := range c.ddlEventCache {
if done {
continue
}
for _, event := range c.ddlEventCache {
eventDone, err := c.asyncExecDDLEvent(ctx, event)
if err != nil {
return false, err
}
if eventDone {
c.ddlEventCache[event] = true
}
jobDone = jobDone && eventDone
}

Expand Down Expand Up @@ -669,8 +682,8 @@ func (c *changefeed) Close(ctx cdcContext.Context) {
}

// GetInfoProvider returns an InfoProvider if one is available.
func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
if provider, ok := c.scheduler.(schedulerv2.InfoProvider); ok {
func (c *changefeed) GetInfoProvider() scheduler.InfoProvider {
if provider, ok := c.scheduler.(scheduler.InfoProvider); ok {
return provider
}
return nil
Expand Down
45 changes: 24 additions & 21 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -147,8 +148,8 @@ type mockScheduler struct {
}

func (m *mockScheduler) Tick(
ctx cdcContext.Context,
state *orchestrator.ChangefeedReactorState,
ctx context.Context,
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
Expand All @@ -163,7 +164,7 @@ func (m *mockScheduler) MoveTable(tableID model.TableID, target model.CaptureID)
func (m *mockScheduler) Rebalance() {}

// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx cdcContext.Context) {}
func (m *mockScheduler) Close(ctx context.Context) {}

func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
*changefeed, *orchestrator.ChangefeedReactorState,
Expand All @@ -183,7 +184,9 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
recordDDLHistory: false,
}
})
cf.newScheduler = func(ctx cdcContext.Context, startTs uint64) (scheduler, error) {
cf.newScheduler = func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
}
cf.upStream = upStream
Expand Down Expand Up @@ -805,12 +808,12 @@ func TestExecRenameTablesDDL(t *testing.T) {
oldTableIDs = append(oldTableIDs, job.TableID)
}
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
}

Expand Down Expand Up @@ -852,19 +855,19 @@ func TestExecRenameTablesDDL(t *testing.T) {
mockDDLSink.recordDDLHistory = true
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, mockDDLSink.ddlHistory[0],
"RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`")
require.Equal(t, mockDDLSink.ddlHistory[1],
"RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`")
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])

// mock all of the rename table statements have been done
mockDDLSink.resetDDLDone = false
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

func TestExecDropTablesDDL(t *testing.T) {
Expand All @@ -886,12 +889,12 @@ func TestExecDropTablesDDL(t *testing.T) {
job := helper.DDL2Job(actualDDL)
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

execCreateStmt("create database test1",
Expand All @@ -909,12 +912,12 @@ func TestExecDropTablesDDL(t *testing.T) {
execDropStmt := func(job *timodel.Job, expectedDDL string) {
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, mockDDLSink.ddlExecuting.Query, expectedDDL)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

execDropStmt(jobs[0], "DROP TABLE `test1`.`tb2`")
Expand All @@ -940,12 +943,12 @@ func TestExecDropViewsDDL(t *testing.T) {
job := helper.DDL2Job(actualDDL)
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}
execCreateStmt("create database test1",
"CREATE DATABASE `test1`")
Expand All @@ -970,12 +973,12 @@ func TestExecDropViewsDDL(t *testing.T) {
execDropStmt := func(job *timodel.Job, expectedDDL string) {
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, mockDDLSink.ddlExecuting.Query, expectedDDL)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

execDropStmt(jobs[0], "DROP VIEW `test1`.`view2`")
Expand Down
21 changes: 7 additions & 14 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ type ddlSinkImpl struct {
checkpointTs model.Ts
currentTableNames []model.TableName
}
// ddlFinishedTsMap is used to check whether a ddl event in a ddl job has
// been executed successfully.
ddlFinishedTsMap sync.Map
// ddlSentTsMap is used to check whether a ddl event in a ddl job has been
// sent to `ddlCh` successfully.
ddlSentTsMap map[*model.DDLEvent]model.Ts
Expand Down Expand Up @@ -198,10 +195,10 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Bool("ignored", err != nil),
zap.Any("ddl", ddl))
s.ddlFinishedTsMap.Store(ddl, ddl.CommitTs)
// Force emitting checkpoint ts when a ddl event is finished.
// Otherwise, a kafka consumer may not execute that ddl event.
s.mu.Lock()
ddl.Done = true
checkpointTs := s.mu.checkpointTs
if checkpointTs == 0 || checkpointTs <= lastCheckpointTs {
s.mu.Unlock()
Expand Down Expand Up @@ -242,22 +239,18 @@ func (s *ddlSinkImpl) emitCheckpointTs(ts uint64, tableNames []model.TableName)
// and CommitTs. So in emitDDLEvent, we get the DDL finished ts of an event
// from a map in order to check whether that event is finshed or not.
func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
var ddlFinishedTs model.Ts

ts, ok := s.ddlFinishedTsMap.Load(ddl)
if ok {
ddlFinishedTs = ts.(model.Ts)
}
if ddl.CommitTs <= ddlFinishedTs {
s.mu.Lock()
if ddl.Done {
// the DDL event is executed successfully, and done is true
log.Info("ddl already executed",
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Uint64("ddlFinishedTs", ddlFinishedTs), zap.Any("DDL", ddl))
s.ddlFinishedTsMap.Delete(ddl)
zap.Any("DDL", ddl))
delete(s.ddlSentTsMap, ddl)
s.mu.Unlock()
return true, nil
}
s.mu.Unlock()

ddlSentTs := s.ddlSentTsMap[ddl]
if ddl.CommitTs <= ddlSentTs {
Expand All @@ -282,7 +275,7 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent)
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Uint64("ddlSentTs", ddlSentTs),
zap.Uint64("ddlFinishedTs", ddlFinishedTs), zap.Any("DDL", ddl))
zap.Any("DDL", ddl))
// if this hit, we think that ddlCh is full,
// just return false and send the ddl in the next round.
}
Expand Down
Loading

0 comments on commit 9ffb23c

Please sign in to comment.