Skip to content

Commit

Permalink
processor: fix task status flushed too many before table is initializ…
Browse files Browse the repository at this point in the history
…ed (#1190) #1191

cherry pick #1190 to release-4.0 (#1191)
  • Loading branch information
ti-srebot authored Dec 10, 2020
1 parent ba68bc1 commit 402379a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ type TaskStatus struct {
Operation map[TableID]*TableOperation `json:"operation"`
AdminJobType AdminJobType `json:"admin-job-type"`
ModRevision int64 `json:"-"`
// true means Operation record has been changed
Dirty bool `json:"-"`
}

// String implements fmt.Stringer interface.
Expand Down
21 changes: 17 additions & 4 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func newProcessor(
schemaStorage: schemaStorage,
errCh: errCh,

flushCheckpointInterval: flushCheckpointInterval,

position: &model.TaskPosition{CheckPointTs: checkpointTs},
output: make(chan *model.PolymorphicEvent, defaultOutputChanSize),

Expand Down Expand Up @@ -534,11 +536,15 @@ func (p *processor) flushTaskStatusAndPosition(ctx context.Context) error {
if err != nil {
return false, backoff.Permanent(errors.Trace(err))
}
err = p.flushTaskPosition(ctx)
if err != nil {
return true, errors.Trace(err)
// processor reads latest task status from etcd, analyzes operation
// field and processes table add or delete. If operation is unapplied
// but stays unchanged after processor handling tables, it means no
// status is changed and we don't need to flush task status neigher.
if !taskStatus.Dirty {
return false, nil
}
return true, nil
err = p.flushTaskPosition(ctx)
return true, err
})
if err != nil {
// not need to check error
Expand Down Expand Up @@ -598,6 +604,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus)
log.Warn("table which will be deleted is not found", zap.Int64("tableID", tableID))
opt.Done = true
opt.Status = model.OperFinished
status.Dirty = true
continue
}
stopped, checkpointTs := table.safeStop()
Expand All @@ -610,6 +617,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus)
opt.Done = true
opt.Status = model.OperFinished
}
status.Dirty = true
}
}
} else {
Expand All @@ -622,6 +630,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus)
}
p.addTable(ctx, tableID, replicaInfo)
opt.Status = model.OperProcessed
status.Dirty = true
}
}

Expand All @@ -639,13 +648,17 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus)
}
status.Operation[tableID].Done = true
status.Operation[tableID].Status = model.OperFinished
status.Dirty = true
default:
goto done
}
}
done:
if !status.SomeOperationsUnapplied() {
status.Operation = nil
// status.Dirty must be true when status changes from `unapplied` to `applied`,
// setting status.Dirty = true is not **must** here.
status.Dirty = true
}
return tablesToRemove, nil
}
Expand Down

0 comments on commit 402379a

Please sign in to comment.