Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9174
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Jun 12, 2023
1 parent 258acb3 commit 64dbe77
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,14 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedBarrierTsGauge = nil

<<<<<<< HEAD
c.isReleased = true
c.initialized = false
=======
if c.isRemoved {
changefeedStatusGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
}
>>>>>>> a6939c3156 (changefeed (ticdc): remove status of a changefeed after it is removed (#9174))
}

// redoManagerCleanup cleanups redo logs if changefeed is removed and redo log is enabled
Expand Down
26 changes: 26 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,13 +910,27 @@ func (p *processor) Close() error {
p.wg.Wait()
p.upStream.Release()

<<<<<<< HEAD
if p.agent == nil {
return nil
}
if err := p.agent.Close(); err != nil {
return errors.Trace(err)
}
p.agent = nil
=======
// clean up metrics first to avoid some metrics are not cleaned up
// when error occurs during closing the processor
p.cleanupMetrics()

p.sinkManager.stop()
p.sinkManager.r = nil
p.sourceManager.stop()
p.sourceManager.r = nil
p.redo.stop()
p.mg.stop()
p.ddlHandler.stop()
>>>>>>> a6939c3156 (changefeed (ticdc): remove status of a changefeed after it is removed (#9174))

// sink close might be time-consuming, do it the last.
if p.sink != nil {
Expand All @@ -941,10 +955,22 @@ func (p *processor) Close() error {
}
// mark tables share the same cdcContext with its original table, don't need to cancel
failpoint.Inject("processorStopDelay", nil)
<<<<<<< HEAD
resolvedTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
=======

log.Info("processor closed",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))

return nil
}

func (p *processor) cleanupMetrics() {
>>>>>>> a6939c3156 (changefeed (ticdc): remove status of a changefeed after it is removed (#9174))
syncTableNumGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorErrorCounter.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
Expand Down

0 comments on commit 64dbe77

Please sign in to comment.