Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4038
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Dec 28, 2021
1 parent 39ebddf commit cc5ca0e
Show file tree
Hide file tree
Showing 3 changed files with 3,024 additions and 2,535 deletions.
34 changes: 31 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type changefeed struct {

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func(ctx cdcContext.Context) (AsyncSink, error)
Expand Down Expand Up @@ -265,6 +267,18 @@ LOOP:
// init metrics
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
<<<<<<< HEAD
=======
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
if err != nil {
return errors.Trace(err)
}

>>>>>>> 20e3f139f (metrics(ticdc): add resolved ts and add changefeed to dataflow (#4038))
c.initialized = true
return nil
}
Expand All @@ -286,10 +300,21 @@ func (c *changefeed) releaseResources() {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
<<<<<<< HEAD
=======
c.scheduler.Close(ctx)

>>>>>>> 20e3f139f (metrics(ticdc): add resolved ts and add changefeed to dataflow (#4038))
changefeedCheckpointTsGauge.DeleteLabelValues(c.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

c.initialized = false
}

Expand Down Expand Up @@ -468,10 +493,13 @@ func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
}
return status, changed, nil
})
phyTs := oracle.ExtractPhysical(checkpointTs)
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) Close() {
Expand Down
18 changes: 17 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ var (
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"changefeed"})
changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"changefeed"})
ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -63,7 +77,9 @@ const (
// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsGauge)
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedCheckpointTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
Expand Down
Loading

0 comments on commit cc5ca0e

Please sign in to comment.