diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index ae90b5e7079..e9ed1691356 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -774,6 +774,10 @@ func (c *changefeed) cleanupMetrics() { changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedBarrierTsGauge = nil + + if c.isRemoved { + changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + } } // cleanup redo logs if changefeed is removed and redo log is enabled diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8289c0579f1..18c4ecdb2c9 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -1228,6 +1228,7 @@ func (p *processor) Close(ctx cdcContext.Context) error { log.Info("processor closing ...", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) +<<<<<<< HEAD p.cancel() if p.pullBasedSinking { if p.sinkManager != nil { @@ -1293,6 +1294,24 @@ func (p *processor) Close(ctx cdcContext.Context) error { return errors.Trace(err) } log.Info("processor close sink success", +======= + + // clean up metrics first to avoid some metrics are not cleaned up + // when error occurs during closing the processor + p.cleanupMetrics() + + p.sinkManager.stop() + p.sinkManager.r = nil + p.sourceManager.stop() + p.sourceManager.r = nil + p.redo.stop() + p.mg.stop() + p.ddlHandler.stop() + + if p.globalVars != nil && p.globalVars.SortEngineFactory != nil { + if err := p.globalVars.SortEngineFactory.Drop(p.changefeedID); err != nil { + log.Error("Processor drop event sort engine fail", +>>>>>>> a6939c3156 (changefeed (ticdc): remove status of a changefeed after it is removed (#9174)) zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Duration("duration", time.Since(start))) @@ -1326,7 +1345,6 @@ func (p *processor) Close(ctx cdcContext.Context) error { // mark tables share the same cdcContext with its original table, don't need to cancel failpoint.Inject("processorStopDelay", nil) - p.cleanupMetrics() log.Info("processor closed", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID))