diff --git a/cdc/model/owner.go b/cdc/model/owner.go index 82d535bfe2f..64faa7e21bc 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -209,6 +209,8 @@ type TaskStatus struct { Operation map[TableID]*TableOperation `json:"operation"` AdminJobType AdminJobType `json:"admin-job-type"` ModRevision int64 `json:"-"` + // true means Operation record has been changed + Dirty bool `json:"-"` } // String implements fmt.Stringer interface. diff --git a/cdc/processor.go b/cdc/processor.go index 59662ef8726..59001ef130f 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -210,6 +210,8 @@ func newProcessor( schemaStorage: schemaStorage, errCh: errCh, + flushCheckpointInterval: flushCheckpointInterval, + position: &model.TaskPosition{CheckPointTs: checkpointTs}, output: make(chan *model.PolymorphicEvent, defaultOutputChanSize), @@ -534,11 +536,15 @@ func (p *processor) flushTaskStatusAndPosition(ctx context.Context) error { if err != nil { return false, backoff.Permanent(errors.Trace(err)) } - err = p.flushTaskPosition(ctx) - if err != nil { - return true, errors.Trace(err) + // processor reads latest task status from etcd, analyzes operation + // field and processes table add or delete. If operation is unapplied + // but stays unchanged after processor handling tables, it means no + // status is changed and we don't need to flush task status neigher. + if !taskStatus.Dirty { + return false, nil } - return true, nil + err = p.flushTaskPosition(ctx) + return true, err }) if err != nil { // not need to check error @@ -598,6 +604,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) log.Warn("table which will be deleted is not found", zap.Int64("tableID", tableID)) opt.Done = true opt.Status = model.OperFinished + status.Dirty = true continue } stopped, checkpointTs := table.safeStop() @@ -610,6 +617,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) opt.Done = true opt.Status = model.OperFinished } + status.Dirty = true } } } else { @@ -622,6 +630,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) } p.addTable(ctx, tableID, replicaInfo) opt.Status = model.OperProcessed + status.Dirty = true } } @@ -639,6 +648,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) } status.Operation[tableID].Done = true status.Operation[tableID].Status = model.OperFinished + status.Dirty = true default: goto done } @@ -646,6 +656,9 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) done: if !status.SomeOperationsUnapplied() { status.Operation = nil + // status.Dirty must be true when status changes from `unapplied` to `applied`, + // setting status.Dirty = true is not **must** here. + status.Dirty = true } return tablesToRemove, nil }