Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: prevent table start-ts regressions (#1236) #1352

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type changeFeed struct {
id string
info *model.ChangeFeedInfo
status *model.ChangeFeedStatus
// The latest checkpointTs already applied to Etcd.
// We need to check this field to ensure visibility to the processors,
// if the operation assumes the progress of the global checkpoint.
appliedCheckpointTs uint64

schema *entry.SingleSchemaSnapshot
ddlState model.ChangeFeedDDLState
Expand Down Expand Up @@ -292,6 +296,11 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
return nil
}

// Do NOT rebalance orphan tables before checkpoint ts has advanced to FinishTs of a DDL
if c.appliedCheckpointTs != c.status.CheckpointTs {
return nil
}

captureIDs := make(map[model.CaptureID]struct{}, len(captures))
cleanedTables := make(map[model.TableID]struct{})
addedTables := make(map[model.TableID]struct{})
Expand Down Expand Up @@ -532,9 +541,20 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
job.Status = model.MoveTableStatusDeleted
log.Info("handle the move job, remove table from the source capture", zap.Reflect("job", job))
case model.MoveTableStatusDeleted:
// Do NOT dispatch tables before checkpoint ts has been flushed to Etcd.
if c.appliedCheckpointTs != c.status.CheckpointTs {
log.Debug("handle the move job, waiting for checkpoint ts to be uploaded",
zap.Uint64("applied-checkpoint-ts", c.appliedCheckpointTs),
zap.Uint64("latest-checkpoint-ts", c.status.CheckpointTs))
continue
}

// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
if replicaInfo.StartTs < c.status.CheckpointTs {
replicaInfo.StartTs = c.status.CheckpointTs
}
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = replicaInfo.StartTs
Expand Down Expand Up @@ -614,6 +634,7 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
if c.ddlState != model.ChangeFeedWaitToExecDDL {
return nil
}

if len(c.ddlJobHistory) == 0 {
log.Panic("ddl job history can not be empty in changefeed when should to execute DDL")
}
Expand All @@ -624,14 +645,21 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return nil
}

if c.status.CheckpointTs != todoDDLJob.BinlogInfo.FinishedTS {
if c.appliedCheckpointTs < todoDDLJob.BinlogInfo.FinishedTS-1 {
log.Debug("wait checkpoint ts",
zap.Uint64("checkpoint ts", c.status.CheckpointTs),
zap.Uint64("applied checkpoint ts", c.appliedCheckpointTs),
zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS),
zap.String("ddl query", todoDDLJob.Query))
return nil
}

if c.appliedCheckpointTs >= todoDDLJob.BinlogInfo.FinishedTS {
log.Panic("applied checkpoint ts is larger than DDL finish ts",
zap.Uint64("applied checkpoint ts", c.appliedCheckpointTs),
zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS))
}

log.Info("apply job", zap.Stringer("job", todoDDLJob),
zap.String("schema", todoDDLJob.SchemaName),
zap.String("query", todoDDLJob.Query),
Expand Down Expand Up @@ -669,10 +697,6 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return nil
}

err = c.balanceOrphanTables(ctx, captures)
if err != nil {
return errors.Trace(err)
}
executed := false
if !c.cyclicEnabled || c.info.Config.Cyclic.SyncDDL {
failpoint.Inject("InjectChangefeedDDLError", func() {
Expand Down Expand Up @@ -867,6 +891,10 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
c.ddlTs = minResolvedTs
}

if len(c.ddlJobHistory) > 0 && minCheckpointTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS {
minCheckpointTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS - 1
}

// if downstream sink is the MQ sink, the MQ sink do not promise that checkpoint is less than globalResolvedTs
if minCheckpointTs > minResolvedTs {
minCheckpointTs = minResolvedTs
Expand Down
69 changes: 44 additions & 25 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func NewOwner(
cli := kv.NewCDCEtcdClient(ctx, sess.Client())
endpoints := sess.Client().Endpoints()

failpoint.Inject("ownerFlushIntervalInject", func(val failpoint.Value) {
flushChangefeedInterval = time.Millisecond * time.Duration(val.(int))
})

owner := &Owner{
done: make(chan struct{}),
session: sess,
Expand Down Expand Up @@ -187,8 +191,15 @@ func (o *Owner) removeCapture(info *model.CaptureInfo) {
startTs = feed.status.CheckpointTs
}

for tableID := range task.Tables {
for tableID, replicaInfo := range task.Tables {
feed.orphanTables[tableID] = startTs
if startTs < replicaInfo.StartTs {
log.Warn("table startTs not consistent",
zap.Uint64("table-start-ts", replicaInfo.StartTs),
zap.Uint64("checkpoint-ts", startTs),
zap.Reflect("status", feed.status))
feed.orphanTables[tableID] = replicaInfo.StartTs
}
}

ctx := context.TODO()
Expand Down Expand Up @@ -412,23 +423,24 @@ func (o *Owner) newChangeFeed(
ResolvedTs: 0,
CheckpointTs: checkpointTs,
},
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
ddlTs: 0,
updateResolvedTs: true,
startTimer: make(chan bool),
syncpointStore: syncpointStore,
syncCancel: nil,
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
cancel: cancel,
appliedCheckpointTs: checkpointTs,
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
ddlTs: 0,
updateResolvedTs: true,
startTimer: make(chan bool),
syncpointStore: syncpointStore,
syncCancel: nil,
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
cancel: cancel,
}
return cf, nil
}
Expand Down Expand Up @@ -672,8 +684,8 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.status.CheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.status.CheckpointTs
if changefeed.appliedCheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.appliedCheckpointTs
}

phyTs := oracle.ExtractPhysical(changefeed.status.CheckpointTs)
Expand All @@ -687,6 +699,9 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
for id, changefeedStatus := range snapshot {
o.changeFeeds[id].appliedCheckpointTs = changefeedStatus.CheckpointTs
}
o.lastFlushChangefeeds = time.Now()
}
}
Expand Down Expand Up @@ -1187,26 +1202,28 @@ func (o *Owner) run(ctx context.Context) error {
return errors.Trace(err)
}

err = o.calcResolvedTs(ctx)
err = o.handleDDL(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleDDL(ctx)
err = o.handleSyncPoint(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleSyncPoint(ctx)
err = o.handleAdminJob(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleAdminJob(ctx)
err = o.calcResolvedTs(ctx)
if err != nil {
return errors.Trace(err)
}

// It is better for flushChangeFeedInfos to follow calcResolvedTs immediately,
// because operations such as handleDDL and rebalancing rely on proper progress of the checkpoint in Etcd.
err = o.flushChangeFeedInfos(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1323,7 +1340,9 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context) error {
for tableID, replicaInfo := range status.Tables {
startTs := replicaInfo.StartTs
if taskPosFound {
startTs = pos.CheckPointTs
if startTs < pos.CheckPointTs {
startTs = pos.CheckPointTs
}
}
o.addOrphanTable(changeFeedID, tableID, startTs)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,8 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(len(owner.captures), check.Equals, 1)
c.Assert(owner.captures, check.HasKey, capture.info.ID)
c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 110})
c.Assert(atomic.LoadInt32(&owner.captureLoaded), check.Equals, int32(1))
c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 100})
// check stale tasks are cleaned up
statuses, err = s.client.GetAllTaskStatus(ctx, changefeed)
c.Assert(err, check.IsNil)
Expand Down
16 changes: 16 additions & 0 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ func newProcessor(
p.status = status
p.statusModRevision = modRevision

info, _, err := p.etcdCli.GetChangeFeedStatus(ctx, p.changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, errors.Trace(err)
}

if err == nil {
p.globalcheckpointTs = info.CheckpointTs
}

for tableID, replicaInfo := range p.status.Tables {
p.addTable(ctx, tableID, replicaInfo)
}
Expand Down Expand Up @@ -770,6 +779,8 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
// use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized.
// The cdc_state_checker will catch a real inconsistency in integration tests.
log.Warn("addTable: startTs < checkpoint",
util.ZapFieldChangefeed(ctx),
zap.Int64("tableID", tableID),
Expand Down Expand Up @@ -1074,6 +1085,11 @@ func (p *processor) sorterConsume(
}
return
}

if checkpointTs < replicaInfo.StartTs {
checkpointTs = replicaInfo.StartTs
}

if checkpointTs != 0 {
atomic.StoreUint64(pCheckpointTs, checkpointTs)
p.localCheckpointTsNotifier.Notify()
Expand Down
9 changes: 7 additions & 2 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, timerInterval time.Duration)

// We are safe to update the ReactorState only if there is no pending patch.
for _, update := range worker.pendingUpdates {
err := worker.state.Update(update.key, update.value)
err := worker.state.Update(update.key, update.value, false)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -181,7 +181,12 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {

worker.rawState = make(map[util.EtcdKey][]byte)
for _, kv := range resp.Kvs {
worker.rawState[util.NewEtcdKeyFromBytes(kv.Key)] = kv.Value
key := util.NewEtcdKeyFromBytes(kv.Key)
worker.rawState[key] = kv.Value
err := worker.state.Update(key, kv.Value, true)
if err != nil {
return errors.Trace(err)
}
}

worker.revision = resp.Header.Revision
Expand Down
6 changes: 3 additions & 3 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *simpleReactorState) SetSum(sum int) {
s.patches = append(s.patches, patch)
}

func (s *simpleReactorState) Update(key util.EtcdKey, value []byte) error {
func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
subMatches := keyParseRegexp.FindSubmatch(key.Bytes())
if len(subMatches) != 2 {
log.Panic("illegal Etcd key", zap.ByteString("key", key.Bytes()))
Expand Down Expand Up @@ -279,13 +279,13 @@ type intReactorState struct {
isUpdated bool
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte) error {
func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
var err error
s.val, err = strconv.Atoi(string(value))
if err != nil {
log.Panic("intReactorState", zap.Error(err))
}
s.isUpdated = true
s.isUpdated = !isInit
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type DataPatch struct {
// ReactorState models the Etcd state of a reactor
type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte) error
Update(key util.EtcdKey, value []byte, isInit bool) error

// GetPatches is called by EtcdWorker, and should return a slice of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/jsonstate/json_reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewJSONReactorState(key string, data interface{}) (*JSONReactorState, error
}

// Update implements the ReactorState interface.
func (s *JSONReactorState) Update(key util.EtcdKey, value []byte) error {
func (s *JSONReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
if key != s.key {
return nil
}
Expand Down
Loading