Skip to content

Commit

Permalink
owner: prevent table start-ts regressions (#1236) (#1352)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored Jan 27, 2021
1 parent d37384f commit 54f8a37
Show file tree
Hide file tree
Showing 60 changed files with 679 additions and 41 deletions.
38 changes: 33 additions & 5 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type changeFeed struct {
id string
info *model.ChangeFeedInfo
status *model.ChangeFeedStatus
// The latest checkpointTs already applied to Etcd.
// We need to check this field to ensure visibility to the processors,
// if the operation assumes the progress of the global checkpoint.
appliedCheckpointTs uint64

schema *entry.SingleSchemaSnapshot
ddlState model.ChangeFeedDDLState
Expand Down Expand Up @@ -292,6 +296,11 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model
return nil
}

// Do NOT rebalance orphan tables before checkpoint ts has advanced to FinishTs of a DDL
if c.appliedCheckpointTs != c.status.CheckpointTs {
return nil
}

captureIDs := make(map[model.CaptureID]struct{}, len(captures))
cleanedTables := make(map[model.TableID]struct{})
addedTables := make(map[model.TableID]struct{})
Expand Down Expand Up @@ -532,9 +541,20 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
job.Status = model.MoveTableStatusDeleted
log.Info("handle the move job, remove table from the source capture", zap.Reflect("job", job))
case model.MoveTableStatusDeleted:
// Do NOT dispatch tables before checkpoint ts has been flushed to Etcd.
if c.appliedCheckpointTs != c.status.CheckpointTs {
log.Debug("handle the move job, waiting for checkpoint ts to be uploaded",
zap.Uint64("applied-checkpoint-ts", c.appliedCheckpointTs),
zap.Uint64("latest-checkpoint-ts", c.status.CheckpointTs))
continue
}

// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
if replicaInfo.StartTs < c.status.CheckpointTs {
replicaInfo.StartTs = c.status.CheckpointTs
}
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = replicaInfo.StartTs
Expand Down Expand Up @@ -614,6 +634,7 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
if c.ddlState != model.ChangeFeedWaitToExecDDL {
return nil
}

if len(c.ddlJobHistory) == 0 {
log.Panic("ddl job history can not be empty in changefeed when should to execute DDL")
}
Expand All @@ -624,14 +645,21 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return nil
}

if c.status.CheckpointTs != todoDDLJob.BinlogInfo.FinishedTS {
if c.appliedCheckpointTs < todoDDLJob.BinlogInfo.FinishedTS-1 {
log.Debug("wait checkpoint ts",
zap.Uint64("checkpoint ts", c.status.CheckpointTs),
zap.Uint64("applied checkpoint ts", c.appliedCheckpointTs),
zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS),
zap.String("ddl query", todoDDLJob.Query))
return nil
}

if c.appliedCheckpointTs >= todoDDLJob.BinlogInfo.FinishedTS {
log.Panic("applied checkpoint ts is larger than DDL finish ts",
zap.Uint64("applied checkpoint ts", c.appliedCheckpointTs),
zap.Uint64("finish ts", todoDDLJob.BinlogInfo.FinishedTS))
}

log.Info("apply job", zap.Stringer("job", todoDDLJob),
zap.String("schema", todoDDLJob.SchemaName),
zap.String("query", todoDDLJob.Query),
Expand Down Expand Up @@ -669,10 +697,6 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return nil
}

err = c.balanceOrphanTables(ctx, captures)
if err != nil {
return errors.Trace(err)
}
executed := false
if !c.cyclicEnabled || c.info.Config.Cyclic.SyncDDL {
failpoint.Inject("InjectChangefeedDDLError", func() {
Expand Down Expand Up @@ -867,6 +891,10 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
c.ddlTs = minResolvedTs
}

if len(c.ddlJobHistory) > 0 && minCheckpointTs >= c.ddlJobHistory[0].BinlogInfo.FinishedTS {
minCheckpointTs = c.ddlJobHistory[0].BinlogInfo.FinishedTS - 1
}

// if downstream sink is the MQ sink, the MQ sink do not promise that checkpoint is less than globalResolvedTs
if minCheckpointTs > minResolvedTs {
minCheckpointTs = minResolvedTs
Expand Down
69 changes: 44 additions & 25 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func NewOwner(
cli := kv.NewCDCEtcdClient(ctx, sess.Client())
endpoints := sess.Client().Endpoints()

failpoint.Inject("ownerFlushIntervalInject", func(val failpoint.Value) {
flushChangefeedInterval = time.Millisecond * time.Duration(val.(int))
})

owner := &Owner{
done: make(chan struct{}),
session: sess,
Expand Down Expand Up @@ -187,8 +191,15 @@ func (o *Owner) removeCapture(info *model.CaptureInfo) {
startTs = feed.status.CheckpointTs
}

for tableID := range task.Tables {
for tableID, replicaInfo := range task.Tables {
feed.orphanTables[tableID] = startTs
if startTs < replicaInfo.StartTs {
log.Warn("table startTs not consistent",
zap.Uint64("table-start-ts", replicaInfo.StartTs),
zap.Uint64("checkpoint-ts", startTs),
zap.Reflect("status", feed.status))
feed.orphanTables[tableID] = replicaInfo.StartTs
}
}

ctx := context.TODO()
Expand Down Expand Up @@ -412,23 +423,24 @@ func (o *Owner) newChangeFeed(
ResolvedTs: 0,
CheckpointTs: checkpointTs,
},
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
ddlTs: 0,
updateResolvedTs: true,
startTimer: make(chan bool),
syncpointStore: syncpointStore,
syncCancel: nil,
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
cancel: cancel,
appliedCheckpointTs: checkpointTs,
scheduler: scheduler.NewScheduler(info.Config.Scheduler.Tp),
ddlState: model.ChangeFeedSyncDML,
ddlExecutedTs: checkpointTs,
targetTs: info.GetTargetTs(),
ddlTs: 0,
updateResolvedTs: true,
startTimer: make(chan bool),
syncpointStore: syncpointStore,
syncCancel: nil,
taskStatus: processorsInfos,
taskPositions: taskPositions,
etcdCli: o.etcdClient,
filter: filter,
sink: primarySink,
cyclicEnabled: info.Config.Cyclic.IsEnabled(),
lastRebalanceTime: time.Now(),
cancel: cancel,
}
return cf, nil
}
Expand Down Expand Up @@ -672,8 +684,8 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.status.CheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.status.CheckpointTs
if changefeed.appliedCheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.appliedCheckpointTs
}

phyTs := oracle.ExtractPhysical(changefeed.status.CheckpointTs)
Expand All @@ -687,6 +699,9 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
for id, changefeedStatus := range snapshot {
o.changeFeeds[id].appliedCheckpointTs = changefeedStatus.CheckpointTs
}
o.lastFlushChangefeeds = time.Now()
}
}
Expand Down Expand Up @@ -1187,26 +1202,28 @@ func (o *Owner) run(ctx context.Context) error {
return errors.Trace(err)
}

err = o.calcResolvedTs(ctx)
err = o.handleDDL(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleDDL(ctx)
err = o.handleSyncPoint(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleSyncPoint(ctx)
err = o.handleAdminJob(ctx)
if err != nil {
return errors.Trace(err)
}

err = o.handleAdminJob(ctx)
err = o.calcResolvedTs(ctx)
if err != nil {
return errors.Trace(err)
}

// It is better for flushChangeFeedInfos to follow calcResolvedTs immediately,
// because operations such as handleDDL and rebalancing rely on proper progress of the checkpoint in Etcd.
err = o.flushChangeFeedInfos(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1323,7 +1340,9 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context) error {
for tableID, replicaInfo := range status.Tables {
startTs := replicaInfo.StartTs
if taskPosFound {
startTs = pos.CheckPointTs
if startTs < pos.CheckPointTs {
startTs = pos.CheckPointTs
}
}
o.addOrphanTable(changeFeedID, tableID, startTs)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,8 @@ func (s *ownerSuite) TestCleanUpStaleTasks(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(len(owner.captures), check.Equals, 1)
c.Assert(owner.captures, check.HasKey, capture.info.ID)
c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 110})
c.Assert(atomic.LoadInt32(&owner.captureLoaded), check.Equals, int32(1))
c.Assert(owner.changeFeeds[changefeed].orphanTables, check.DeepEquals, map[model.TableID]model.Ts{51: 100})
// check stale tasks are cleaned up
statuses, err = s.client.GetAllTaskStatus(ctx, changefeed)
c.Assert(err, check.IsNil)
Expand Down
16 changes: 16 additions & 0 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ func newProcessor(
p.status = status
p.statusModRevision = modRevision

info, _, err := p.etcdCli.GetChangeFeedStatus(ctx, p.changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, errors.Trace(err)
}

if err == nil {
p.globalcheckpointTs = info.CheckpointTs
}

for tableID, replicaInfo := range p.status.Tables {
p.addTable(ctx, tableID, replicaInfo)
}
Expand Down Expand Up @@ -770,6 +779,8 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
// use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized.
// The cdc_state_checker will catch a real inconsistency in integration tests.
log.Warn("addTable: startTs < checkpoint",
util.ZapFieldChangefeed(ctx),
zap.Int64("tableID", tableID),
Expand Down Expand Up @@ -1074,6 +1085,11 @@ func (p *processor) sorterConsume(
}
return
}

if checkpointTs < replicaInfo.StartTs {
checkpointTs = replicaInfo.StartTs
}

if checkpointTs != 0 {
atomic.StoreUint64(pCheckpointTs, checkpointTs)
p.localCheckpointTsNotifier.Notify()
Expand Down
9 changes: 7 additions & 2 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, timerInterval time.Duration)

// We are safe to update the ReactorState only if there is no pending patch.
for _, update := range worker.pendingUpdates {
err := worker.state.Update(update.key, update.value)
err := worker.state.Update(update.key, update.value, false)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -181,7 +181,12 @@ func (worker *EtcdWorker) syncRawState(ctx context.Context) error {

worker.rawState = make(map[util.EtcdKey][]byte)
for _, kv := range resp.Kvs {
worker.rawState[util.NewEtcdKeyFromBytes(kv.Key)] = kv.Value
key := util.NewEtcdKeyFromBytes(kv.Key)
worker.rawState[key] = kv.Value
err := worker.state.Update(key, kv.Value, true)
if err != nil {
return errors.Trace(err)
}
}

worker.revision = resp.Header.Revision
Expand Down
6 changes: 3 additions & 3 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *simpleReactorState) SetSum(sum int) {
s.patches = append(s.patches, patch)
}

func (s *simpleReactorState) Update(key util.EtcdKey, value []byte) error {
func (s *simpleReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
subMatches := keyParseRegexp.FindSubmatch(key.Bytes())
if len(subMatches) != 2 {
log.Panic("illegal Etcd key", zap.ByteString("key", key.Bytes()))
Expand Down Expand Up @@ -279,13 +279,13 @@ type intReactorState struct {
isUpdated bool
}

func (s *intReactorState) Update(key util.EtcdKey, value []byte) error {
func (s *intReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
var err error
s.val, err = strconv.Atoi(string(value))
if err != nil {
log.Panic("intReactorState", zap.Error(err))
}
s.isUpdated = true
s.isUpdated = !isInit
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type DataPatch struct {
// ReactorState models the Etcd state of a reactor
type ReactorState interface {
// Update is called by EtcdWorker to notify the Reactor of a latest change to the Etcd state.
Update(key util.EtcdKey, value []byte) error
Update(key util.EtcdKey, value []byte, isInit bool) error

// GetPatches is called by EtcdWorker, and should return a slice of data patches that represents the changes
// that a Reactor wants to apply to Etcd.
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/jsonstate/json_reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewJSONReactorState(key string, data interface{}) (*JSONReactorState, error
}

// Update implements the ReactorState interface.
func (s *JSONReactorState) Update(key util.EtcdKey, value []byte) error {
func (s *JSONReactorState) Update(key util.EtcdKey, value []byte, isInit bool) error {
if key != s.key {
return nil
}
Expand Down
Loading

0 comments on commit 54f8a37

Please sign in to comment.