diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index dba1778934c..d1613bce0fa 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -759,6 +759,10 @@ func (c *changefeed) cleanupMetrics() { changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedBarrierTsGauge = nil + + if c.isRemoved { + changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) + } } // cleanup redo logs if changefeed is removed and redo log is enabled diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index e5067edc718..94fd7640704 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -904,6 +904,10 @@ func (p *processor) Close() error { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID)) + // clean up metrics first to avoid some metrics are not cleaned up + // when error occurs during closing the processor + p.cleanupMetrics() + p.sinkManager.stop() p.sinkManager.r = nil p.sourceManager.stop() @@ -941,7 +945,6 @@ func (p *processor) Close() error { // mark tables share the same cdcContext with its original table, don't need to cancel failpoint.Inject("processorStopDelay", nil) - p.cleanupMetrics() log.Info("processor closed", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID))