Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-2249-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 19, 2021
2 parents 7fb2b11 + 2e118a6 commit d7f2caf
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,8 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
ownerMaintainTableNumGauge.DeleteLabelValues(cf.id, capture.AdvertiseAddr, maintainTableTypeWip)
}
delete(o.changeFeeds, job.CfID)
changefeedCheckpointTsGauge.DeleteLabelValues(cf.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(cf.id)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ func (s *asyncSinkImpl) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) {
func (s *asyncSinkImpl) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs)
if ddl.CommitTs <= ddlFinishedTs {
// the DDL event is executed successfully, and done is true
return true, nil
}
if ddl.CommitTs <= s.ddlSentTs {
// the DDL event is executing and not finished yes, return false
return false, nil
}
select {
Expand Down
8 changes: 7 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs)
// Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL,
// when there is a recovery, there is no guarantee that the DDL at the checkpoint
// has been executed. So we need to start the DDL puller from (checkpoint-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -317,6 +320,9 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
case ddlJobBarrier:
ddlResolvedTs, ddlJob := c.ddlPuller.FrontDDL()
if ddlJob == nil || ddlResolvedTs != barrierTs {
if ddlResolvedTs < barrierTs {
return barrierTs, nil
}
c.barriers.Update(ddlJobBarrier, ddlResolvedTs)
return barrierTs, nil
}
Expand Down
5 changes: 4 additions & 1 deletion cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
return nil, errors.Trace(err)
}
}
// We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs,
// because the DDL puller might send a DDL at startTs, which would cause schema conflicts if
// the DDL's result is already contained in the snapshot.
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -58,7 +61,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
schemaSnapshot: schemaSnap,
filter: f,
config: config,
ddlHandledTs: startTs - 1,
ddlHandledTs: startTs,
}, nil
}

Expand Down
7 changes: 6 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,12 @@ func (p *oldProcessor) stop(ctx context.Context) error {
log.Warn("an error occurred when stopping the processor", zap.Error(err))
errRes = err
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(0)
resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
return errRes
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,8 @@ func (p *processor) doGCSchemaStorage() error {
func (p *processor) Close() error {
for _, tbl := range p.tables {
tbl.Cancel()
}
for _, tbl := range p.tables {
tbl.Wait()
}
p.cancel()
Expand Down
2 changes: 1 addition & 1 deletion tests/kill_owner_with_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ function run() {

for i in $(seq 1 3); do
kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY
sleep 2
sleep 8
done

export GO_FAILPOINTS=''
Expand Down

0 comments on commit d7f2caf

Please sign in to comment.